summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/gst/gstreamer-sections.txt2
-rw-r--r--gst/gsttask.c9
-rw-r--r--gst/gsttaskpool.c60
-rw-r--r--gst/gsttaskpool.h19
-rw-r--r--tests/examples/streams/.gitignore1
-rw-r--r--tests/examples/streams/testrtpool.c48
-rw-r--r--win32/common/libgstreamer.def1
7 files changed, 96 insertions, 44 deletions
diff --git a/docs/gst/gstreamer-sections.txt b/docs/gst/gstreamer-sections.txt
index 1fd6afdd12..7a265c271f 100644
--- a/docs/gst/gstreamer-sections.txt
+++ b/docs/gst/gstreamer-sections.txt
@@ -2173,8 +2173,8 @@ gst_tag_setter_get_type
<TITLE>GstTaskPool</TITLE>
GstTaskPool
GstTaskPoolClass
+GstTaskPoolFunction
gst_task_pool_new
-gst_task_pool_set_func
gst_task_pool_prepare
gst_task_pool_push
gst_task_pool_join
diff --git a/gst/gsttask.c b/gst/gsttask.c
index 3aec20ffd0..dc361b5bce 100644
--- a/gst/gsttask.c
+++ b/gst/gsttask.c
@@ -92,7 +92,7 @@ static void gst_task_class_init (GstTaskClass * klass);
static void gst_task_init (GstTask * task);
static void gst_task_finalize (GObject * object);
-static void gst_task_func (GstTask * task, GstTaskClass * tclass);
+static void gst_task_func (GstTask * task);
static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
@@ -112,7 +112,6 @@ init_klass_pool (GstTaskClass * klass)
gst_object_unref (klass->pool);
}
klass->pool = gst_task_pool_new ();
- gst_task_pool_set_func (klass->pool, (GFunc) gst_task_func, klass);
gst_task_pool_prepare (klass->pool, NULL);
g_static_mutex_unlock (&pool_lock);
}
@@ -177,7 +176,7 @@ gst_task_finalize (GObject * object)
}
static void
-gst_task_func (GstTask * task, GstTaskClass * tclass)
+gst_task_func (GstTask * task)
{
GStaticRecMutex *lock;
GThread *tself;
@@ -564,7 +563,9 @@ start_task (GstTask * task)
/* push on the thread pool, we remember the original pool because the user
* could change it later on and then we join to the wrong pool. */
priv->pool_id = gst_object_ref (priv->pool);
- priv->id = gst_task_pool_push (priv->pool_id, task, &error);
+ priv->id =
+ gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func,
+ task, &error);
if (error != NULL) {
g_warning ("failed to create thread: %s", error->message);
diff --git a/gst/gsttaskpool.c b/gst/gsttaskpool.c
index 12e8f4ad70..e17af0bfbb 100644
--- a/gst/gsttaskpool.c
+++ b/gst/gsttaskpool.c
@@ -51,12 +51,30 @@ static void gst_task_pool_finalize (GObject * object);
G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init);
+typedef struct
+{
+ GstTaskPoolFunction func;
+ gpointer user_data;
+} TaskData;
+
static void
-default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data,
- GError ** error)
+default_func (TaskData * tdata, GstTaskPool * pool)
+{
+ GstTaskPoolFunction func;
+ gpointer user_data;
+
+ func = tdata->func;
+ user_data = tdata->user_data;
+ g_slice_free (TaskData, tdata);
+
+ func (user_data);
+}
+
+static void
+default_prepare (GstTaskPool * pool, GError ** error)
{
GST_OBJECT_LOCK (pool);
- pool->pool = g_thread_pool_new ((GFunc) func, user_data, -1, FALSE, NULL);
+ pool->pool = g_thread_pool_new ((GFunc) default_func, pool, -1, FALSE, NULL);
GST_OBJECT_UNLOCK (pool);
}
@@ -75,11 +93,21 @@ default_cleanup (GstTaskPool * pool)
}
static gpointer
-default_push (GstTaskPool * pool, gpointer data, GError ** error)
+default_push (GstTaskPool * pool, GstTaskPoolFunction func,
+ gpointer user_data, GError ** error)
{
+ TaskData *tdata;
+
+ tdata = g_slice_new (TaskData);
+ tdata->func = func;
+ tdata->user_data = user_data;
+
GST_OBJECT_LOCK (pool);
if (pool->pool)
- g_thread_pool_push (pool->pool, data, error);
+ g_thread_pool_push (pool->pool, tdata, error);
+ else {
+ g_slice_free (TaskData, tdata);
+ }
GST_OBJECT_UNLOCK (pool);
return NULL;
@@ -88,7 +116,7 @@ default_push (GstTaskPool * pool, gpointer data, GError ** error)
static void
default_join (GstTaskPool * pool, gpointer id)
{
- /* does nothing, we can't join for threads from the threadpool */
+ /* we do nothing here, we can't join from the pools */
}
static void
@@ -141,16 +169,6 @@ gst_task_pool_new (void)
return pool;
}
-void
-gst_task_pool_set_func (GstTaskPool * pool, GFunc func, gpointer user_data)
-{
- g_return_if_fail (GST_IS_TASK_POOL (pool));
-
- pool->func = func;
- pool->user_data = user_data;
-}
-
-
/**
* gst_task_pool_prepare:
* @pool: a #GstTaskPool
@@ -170,7 +188,7 @@ gst_task_pool_prepare (GstTaskPool * pool, GError ** error)
klass = GST_TASK_POOL_GET_CLASS (pool);
if (klass->prepare)
- klass->prepare (pool, pool->func, pool->user_data, error);
+ klass->prepare (pool, error);
}
/**
@@ -198,7 +216,8 @@ gst_task_pool_cleanup (GstTaskPool * pool)
/**
* gst_task_pool_push:
* @pool: a #GstTaskPool
- * @data: data to pass to the thread function
+ * @func: the function to call
+ * @user_data: data to pass to @func
* @error: return location for an error
*
* Start the execution of a new thread from @pool.
@@ -208,7 +227,8 @@ gst_task_pool_cleanup (GstTaskPool * pool)
* errors.
*/
gpointer
-gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error)
+gst_task_pool_push (GstTaskPool * pool, GstTaskPoolFunction func,
+ gpointer user_data, GError ** error)
{
GstTaskPoolClass *klass;
@@ -219,7 +239,7 @@ gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error)
if (klass->push == NULL)
goto not_supported;
- return klass->push (pool, data, error);
+ return klass->push (pool, func, user_data, error);
/* ERRORS */
not_supported:
diff --git a/gst/gsttaskpool.h b/gst/gsttaskpool.h
index d9f922d43b..eda5c6ca23 100644
--- a/gst/gsttaskpool.h
+++ b/gst/gsttaskpool.h
@@ -38,6 +38,8 @@ G_BEGIN_DECLS
typedef struct _GstTaskPool GstTaskPool;
typedef struct _GstTaskPoolClass GstTaskPoolClass;
+typedef void (*GstTaskPoolFunction) (void *data);
+
/**
* GstTaskPool:
*
@@ -47,9 +49,6 @@ struct _GstTaskPool {
GstObject object;
/*< private >*/
- GFunc func;
- gpointer user_data;
-
GThreadPool *pool;
gpointer _gst_reserved[GST_PADDING];
@@ -69,11 +68,11 @@ struct _GstTaskPoolClass {
GstObjectClass parent_class;
/*< public >*/
- void (*prepare) (GstTaskPool *pool, GFunc func,
- gpointer user_data, GError **error);
+ void (*prepare) (GstTaskPool *pool, GError **error);
void (*cleanup) (GstTaskPool *pool);
- gpointer (*push) (GstTaskPool *pool, gpointer data, GError **error);
+ gpointer (*push) (GstTaskPool *pool, GstTaskPoolFunction func,
+ gpointer user_data, GError **error);
void (*join) (GstTaskPool *pool, gpointer id);
/*< private >*/
@@ -83,14 +82,10 @@ struct _GstTaskPoolClass {
GType gst_task_pool_get_type (void);
GstTaskPool * gst_task_pool_new (void);
-
-void gst_task_pool_set_func (GstTaskPool *pool,
- GFunc func, gpointer user_data);
-
void gst_task_pool_prepare (GstTaskPool *pool, GError **error);
-gpointer gst_task_pool_push (GstTaskPool *pool, gpointer data,
- GError **error);
+gpointer gst_task_pool_push (GstTaskPool *pool, GstTaskPoolFunction func,
+ gpointer user_data, GError **error);
void gst_task_pool_join (GstTaskPool *pool, gpointer id);
void gst_task_pool_cleanup (GstTaskPool *pool);
diff --git a/tests/examples/streams/.gitignore b/tests/examples/streams/.gitignore
index 99261ec4b7..d8cfe30d07 100644
--- a/tests/examples/streams/.gitignore
+++ b/tests/examples/streams/.gitignore
@@ -1,4 +1,5 @@
stream-status
+rtpool-test
*.bb
*.bbg
*.da
diff --git a/tests/examples/streams/testrtpool.c b/tests/examples/streams/testrtpool.c
index b010b1ebb7..41cd4edf25 100644
--- a/tests/examples/streams/testrtpool.c
+++ b/tests/examples/streams/testrtpool.c
@@ -17,18 +17,26 @@
* Boston, MA 02111-1307, USA.
*/
+#include <pthread.h>
+
#include "testrtpool.h"
static void test_rt_pool_class_init (TestRTPoolClass * klass);
static void test_rt_pool_init (TestRTPool * pool);
static void test_rt_pool_finalize (GObject * object);
+typedef struct
+{
+ pthread_t thread;
+} TestRTId;
+
G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);
static void
-default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data,
- GError ** error)
+default_prepare (GstTaskPool * pool, GError ** error)
{
+ /* we don't do anything here. We could construct a pool of threads here that
+ * we could reuse later but we don't */
g_message ("prepare Realtime pool %p", pool);
}
@@ -39,19 +47,47 @@ default_cleanup (GstTaskPool * pool)
}
static gpointer
-default_push (GstTaskPool * pool, gpointer data, GError ** error)
+default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
+ GError ** error)
{
- g_message ("pushing Realtime pool %p", pool);
+ TestRTId *tid;
+ gint res;
+ pthread_attr_t attr;
+ //struct sched_param param;
+
+ g_message ("pushing Realtime pool %p, %p", pool, func);
- *error = g_error_new (1, 1, "not supported");
+ tid = g_slice_new0 (TestRTId);
- return NULL;
+ pthread_attr_init (&attr);
+ /*
+ pthread_attr_setschedpolicy (&attr, SCHED_RR);
+ param.sched_priority = 50;
+ pthread_attr_setschedparam (&attr, &param);
+ */
+
+ res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);
+
+ if (res < 0) {
+ g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
+ "Error creating thread: %s", g_strerror (res));
+ g_slice_free (TestRTId, tid);
+ tid = NULL;
+ }
+
+ return tid;
}
static void
default_join (GstTaskPool * pool, gpointer id)
{
+ TestRTId *tid = (TestRTId *) id;
+
g_message ("joining Realtime pool %p", pool);
+
+ pthread_join (tid->thread, NULL);
+
+ g_slice_free (TestRTId, tid);
}
static void
diff --git a/win32/common/libgstreamer.def b/win32/common/libgstreamer.def
index 5b4225a0e1..262b52a894 100644
--- a/win32/common/libgstreamer.def
+++ b/win32/common/libgstreamer.def
@@ -950,7 +950,6 @@ EXPORTS
gst_task_pool_new
gst_task_pool_prepare
gst_task_pool_push
- gst_task_pool_set_func
gst_task_set_lock
gst_task_set_pool
gst_task_set_priority