diff options
-rw-r--r-- | docs/gst/gstreamer-sections.txt | 2 | ||||
-rw-r--r-- | gst/gsttask.c | 9 | ||||
-rw-r--r-- | gst/gsttaskpool.c | 60 | ||||
-rw-r--r-- | gst/gsttaskpool.h | 19 | ||||
-rw-r--r-- | tests/examples/streams/.gitignore | 1 | ||||
-rw-r--r-- | tests/examples/streams/testrtpool.c | 48 | ||||
-rw-r--r-- | win32/common/libgstreamer.def | 1 |
7 files changed, 96 insertions, 44 deletions
diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt index 1fd6afdd12..7a265c271f 100644 --- a/docs/gst/gstreamer-sections.txt +++ b/docs/gst/gstreamer-sections.txt @@ -2173,8 +2173,8 @@ gst_tag_setter_get_type <TITLE>GstTaskPool</TITLE> GstTaskPool GstTaskPoolClass +GstTaskPoolFunction gst_task_pool_new -gst_task_pool_set_func gst_task_pool_prepare gst_task_pool_push gst_task_pool_join diff --git a/gst/gsttask.c b/gst/gsttask.c index 3aec20ffd0..dc361b5bce 100644 --- a/gst/gsttask.c +++ b/gst/gsttask.c @@ -92,7 +92,7 @@ static void gst_task_class_init (GstTaskClass * klass); static void gst_task_init (GstTask * task); static void gst_task_finalize (GObject * object); -static void gst_task_func (GstTask * task, GstTaskClass * tclass); +static void gst_task_func (GstTask * task); static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; @@ -112,7 +112,6 @@ init_klass_pool (GstTaskClass * klass) gst_object_unref (klass->pool); } klass->pool = gst_task_pool_new (); - gst_task_pool_set_func (klass->pool, (GFunc) gst_task_func, klass); gst_task_pool_prepare (klass->pool, NULL); g_static_mutex_unlock (&pool_lock); } @@ -177,7 +176,7 @@ gst_task_finalize (GObject * object) } static void -gst_task_func (GstTask * task, GstTaskClass * tclass) +gst_task_func (GstTask * task) { GStaticRecMutex *lock; GThread *tself; @@ -564,7 +563,9 @@ start_task (GstTask * task) /* push on the thread pool, we remember the original pool because the user * could change it later on and then we join to the wrong pool. */ priv->pool_id = gst_object_ref (priv->pool); - priv->id = gst_task_pool_push (priv->pool_id, task, &error); + priv->id = + gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func, + task, &error); if (error != NULL) { g_warning ("failed to create thread: %s", error->message); diff --git a/gst/gsttaskpool.c b/gst/gsttaskpool.c index 12e8f4ad70..e17af0bfbb 100644 --- a/gst/gsttaskpool.c +++ b/gst/gsttaskpool.c @@ -51,12 +51,30 @@ static void gst_task_pool_finalize (GObject * object); G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init); +typedef struct +{ + GstTaskPoolFunction func; + gpointer user_data; +} TaskData; + static void -default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data, - GError ** error) +default_func (TaskData * tdata, GstTaskPool * pool) +{ + GstTaskPoolFunction func; + gpointer user_data; + + func = tdata->func; + user_data = tdata->user_data; + g_slice_free (TaskData, tdata); + + func (user_data); +} + +static void +default_prepare (GstTaskPool * pool, GError ** error) { GST_OBJECT_LOCK (pool); - pool->pool = g_thread_pool_new ((GFunc) func, user_data, -1, FALSE, NULL); + pool->pool = g_thread_pool_new ((GFunc) default_func, pool, -1, FALSE, NULL); GST_OBJECT_UNLOCK (pool); } @@ -75,11 +93,21 @@ default_cleanup (GstTaskPool * pool) } static gpointer -default_push (GstTaskPool * pool, gpointer data, GError ** error) +default_push (GstTaskPool * pool, GstTaskPoolFunction func, + gpointer user_data, GError ** error) { + TaskData *tdata; + + tdata = g_slice_new (TaskData); + tdata->func = func; + tdata->user_data = user_data; + GST_OBJECT_LOCK (pool); if (pool->pool) - g_thread_pool_push (pool->pool, data, error); + g_thread_pool_push (pool->pool, tdata, error); + else { + g_slice_free (TaskData, tdata); + } GST_OBJECT_UNLOCK (pool); return NULL; @@ -88,7 +116,7 @@ default_push (GstTaskPool * pool, gpointer data, GError ** error) static void default_join (GstTaskPool * pool, gpointer id) { - /* does nothing, we can't join for threads from the threadpool */ + /* we do nothing here, we can't join from the pools */ } static void @@ -141,16 +169,6 @@ gst_task_pool_new (void) return pool; } -void -gst_task_pool_set_func (GstTaskPool * pool, GFunc func, gpointer user_data) -{ - g_return_if_fail (GST_IS_TASK_POOL (pool)); - - pool->func = func; - pool->user_data = user_data; -} - - /** * gst_task_pool_prepare: * @pool: a #GstTaskPool @@ -170,7 +188,7 @@ gst_task_pool_prepare (GstTaskPool * pool, GError ** error) klass = GST_TASK_POOL_GET_CLASS (pool); if (klass->prepare) - klass->prepare (pool, pool->func, pool->user_data, error); + klass->prepare (pool, error); } /** @@ -198,7 +216,8 @@ gst_task_pool_cleanup (GstTaskPool * pool) /** * gst_task_pool_push: * @pool: a #GstTaskPool - * @data: data to pass to the thread function + * @func: the function to call + * @user_data: data to pass to @func * @error: return location for an error * * Start the execution of a new thread from @pool. @@ -208,7 +227,8 @@ gst_task_pool_cleanup (GstTaskPool * pool) * errors. */ gpointer -gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error) +gst_task_pool_push (GstTaskPool * pool, GstTaskPoolFunction func, + gpointer user_data, GError ** error) { GstTaskPoolClass *klass; @@ -219,7 +239,7 @@ gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error) if (klass->push == NULL) goto not_supported; - return klass->push (pool, data, error); + return klass->push (pool, func, user_data, error); /* ERRORS */ not_supported: diff --git a/gst/gsttaskpool.h b/gst/gsttaskpool.h index d9f922d43b..eda5c6ca23 100644 --- a/gst/gsttaskpool.h +++ b/gst/gsttaskpool.h @@ -38,6 +38,8 @@ G_BEGIN_DECLS typedef struct _GstTaskPool GstTaskPool; typedef struct _GstTaskPoolClass GstTaskPoolClass; +typedef void (*GstTaskPoolFunction) (void *data); + /** * GstTaskPool: * @@ -47,9 +49,6 @@ struct _GstTaskPool { GstObject object; /*< private >*/ - GFunc func; - gpointer user_data; - GThreadPool *pool; gpointer _gst_reserved[GST_PADDING]; @@ -69,11 +68,11 @@ struct _GstTaskPoolClass { GstObjectClass parent_class; /*< public >*/ - void (*prepare) (GstTaskPool *pool, GFunc func, - gpointer user_data, GError **error); + void (*prepare) (GstTaskPool *pool, GError **error); void (*cleanup) (GstTaskPool *pool); - gpointer (*push) (GstTaskPool *pool, gpointer data, GError **error); + gpointer (*push) (GstTaskPool *pool, GstTaskPoolFunction func, + gpointer user_data, GError **error); void (*join) (GstTaskPool *pool, gpointer id); /*< private >*/ @@ -83,14 +82,10 @@ struct _GstTaskPoolClass { GType gst_task_pool_get_type (void); GstTaskPool * gst_task_pool_new (void); - -void gst_task_pool_set_func (GstTaskPool *pool, - GFunc func, gpointer user_data); - void gst_task_pool_prepare (GstTaskPool *pool, GError **error); -gpointer gst_task_pool_push (GstTaskPool *pool, gpointer data, - GError **error); +gpointer gst_task_pool_push (GstTaskPool *pool, GstTaskPoolFunction func, + gpointer user_data, GError **error); void gst_task_pool_join (GstTaskPool *pool, gpointer id); void gst_task_pool_cleanup (GstTaskPool *pool); diff --git a/tests/examples/streams/.gitignore b/tests/examples/streams/.gitignore index 99261ec4b7..d8cfe30d07 100644 --- a/tests/examples/streams/.gitignore +++ b/tests/examples/streams/.gitignore @@ -1,4 +1,5 @@ stream-status +rtpool-test *.bb *.bbg *.da diff --git a/tests/examples/streams/testrtpool.c b/tests/examples/streams/testrtpool.c index b010b1ebb7..41cd4edf25 100644 --- a/tests/examples/streams/testrtpool.c +++ b/tests/examples/streams/testrtpool.c @@ -17,18 +17,26 @@ * Boston, MA 02111-1307, USA. */ +#include <pthread.h> + #include "testrtpool.h" static void test_rt_pool_class_init (TestRTPoolClass * klass); static void test_rt_pool_init (TestRTPool * pool); static void test_rt_pool_finalize (GObject * object); +typedef struct +{ + pthread_t thread; +} TestRTId; + G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL); static void -default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data, - GError ** error) +default_prepare (GstTaskPool * pool, GError ** error) { + /* we don't do anything here. We could construct a pool of threads here that + * we could reuse later but we don't */ g_message ("prepare Realtime pool %p", pool); } @@ -39,19 +47,47 @@ default_cleanup (GstTaskPool * pool) } static gpointer -default_push (GstTaskPool * pool, gpointer data, GError ** error) +default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data, + GError ** error) { - g_message ("pushing Realtime pool %p", pool); + TestRTId *tid; + gint res; + pthread_attr_t attr; + //struct sched_param param; + + g_message ("pushing Realtime pool %p, %p", pool, func); - *error = g_error_new (1, 1, "not supported"); + tid = g_slice_new0 (TestRTId); - return NULL; + pthread_attr_init (&attr); + /* + pthread_attr_setschedpolicy (&attr, SCHED_RR); + param.sched_priority = 50; + pthread_attr_setschedparam (&attr, ¶m); + */ + + res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data); + + if (res < 0) { + g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN, + "Error creating thread: %s", g_strerror (res)); + g_slice_free (TestRTId, tid); + tid = NULL; + } + + return tid; } static void default_join (GstTaskPool * pool, gpointer id) { + TestRTId *tid = (TestRTId *) id; + g_message ("joining Realtime pool %p", pool); + + pthread_join (tid->thread, NULL); + + g_slice_free (TestRTId, tid); } static void diff --git a/win32/common/libgstreamer.def b/win32/common/libgstreamer.def index 5b4225a0e1..262b52a894 100644 --- a/win32/common/libgstreamer.def +++ b/win32/common/libgstreamer.def @@ -950,7 +950,6 @@ EXPORTS gst_task_pool_new gst_task_pool_prepare gst_task_pool_push - gst_task_pool_set_func gst_task_set_lock gst_task_set_pool gst_task_set_priority |