summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2011-05-30 13:37:23 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2011-05-30 13:37:23 +0200
commit058d0b6791ed38f1584481bc5bc103b92a1abff9 (patch)
treed14f9a018e3a99a41a40796788af9bd564ffd6fd
parent9f2424123f141990b63962aaae9f88db9bd8e3e9 (diff)
pad: rework pad block some morerework-pad-block2
-rw-r--r--gst/gstpad.c308
-rw-r--r--gst/gstpad.h28
2 files changed, 196 insertions, 140 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c
index 4678a538d..a8e1c1e94 100644
--- a/gst/gstpad.c
+++ b/gst/gstpad.c
@@ -1069,24 +1069,27 @@ gst_pad_is_active (GstPad * pad)
}
/**
- * gst_pad_set_blocked:
- * @pad: the #GstPad to block or unblock
- * @blocked: boolean indicating whether the pad should be blocked or unblocked
- * @callback: #GstPadBlockCallback that will be called when the
+ * gst_pad_block:
+ * @pad: the #GstPad to block
+ * @ide_cb: #GstPadIdleCallback that will be called when the
+ * pad becomes idle
+ * @block_cb: #GstPadBlockCallback that will be called when the
* operation succeeds
- * @user_data: (closure): user data passed to the callback
+ * @user_data: (closure): user data passed to the callbacks
* @destroy_data: #GDestroyNotify for user_data
*
- * Blocks or unblocks the dataflow on a pad. The provided callback
- * is called when the operation succeeds; this happens right before the next
- * attempt at pushing a buffer on the pad.
+ * Blocks the dataflow on a pad.
+ *
+ * First @idle_cb is called when no dataflow is happening on @pad. This can
+ * happen from the thread that calls gst_pad_block() when the pad is currently
+ * idle or from the streaming thread when the pad was busy when this method is
+ * called.
+ *
+ * Then block_cb is called when new data is pushed on a pad and is blocked. This
+ * can take a while as there needs to be dataflow on the pad.
*
- * This can take a while as the pad can only become blocked when real dataflow
- * is happening.
* When the pipeline is stalled, for example in PAUSED, this can
* take an indeterminate amount of time.
- * You can pass NULL as the callback to make this call block. Be careful with
- * this blocking call as it might not return for reasons stated above.
*
* <note>
* Pad block handlers are only called for source pads in push mode
@@ -1099,80 +1102,48 @@ gst_pad_is_active (GstPad * pad)
* MT safe.
*/
gboolean
-gst_pad_set_blocked (GstPad * pad, gboolean blocked,
- GstPadBlockCallback callback, gpointer user_data,
+gst_pad_block (GstPad * pad, GstPadIdleCallback idle_cb,
+ GstPadBlockCallback block_cb, gpointer user_data,
GDestroyNotify destroy_data)
{
- gboolean was_blocked = FALSE;
-
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
GST_OBJECT_LOCK (pad);
- was_blocked = GST_PAD_IS_BLOCKED (pad);
-
- if (G_UNLIKELY (was_blocked == blocked))
+ if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
goto had_right_state;
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
- pad->block_callback = NULL;
- pad->block_data = NULL;
- pad->block_destroy_data = NULL;
+ /* we always need to save the block_cb and user data */
+ pad->block_cb = block_cb;
+ pad->block_data = user_data;
+ pad->block_destroy_data = destroy_data;
- if (blocked) {
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad");
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad");
- /* set the blocking flag, future push operations will block */
- GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
+ /* set the blocking flag, future push operations will block */
+ GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
- if (pad->block_destroy_data && pad->block_data)
- pad->block_destroy_data (pad->block_data);
+ if (pad->priv->using > 0) {
+ /* the pad is in use, we can't signal the callback yet. Since we set the
+ * flag above, the last thread to leave the push will do the callback. New
+ * threads going into the push will block. */
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use");
- if (pad->priv->using > 0) {
- /* the pad is in use, we can't signal the callback yet. Since we set the
- * flag above, the last thread to leave the push will do the callback. New
- * threads going into the push will block. */
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use");
-
- if (callback == NULL) {
- /* backwards compat, if there is no callback, this method should wait
- * until the pad is blocked. */
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
- GST_PAD_BLOCK_WAIT (pad);
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
- } else {
- /* else save the callback, we will signal from the streaming thread when
- * the last thread using the pad is stopped */
- pad->block_callback = callback;
- pad->block_data = user_data;
- pad->block_destroy_data = destroy_data;
- }
- GST_OBJECT_UNLOCK (pad);
- } else {
- /* the pad is idle now, we can signal the callback now */
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle");
- GST_OBJECT_UNLOCK (pad);
-
- if (callback) {
- callback (pad, TRUE, user_data);
-
- if (destroy_data)
- destroy_data (user_data);
- }
- }
+ /* else save the callback, we will signal from the streaming thread when
+ * the last thread using the pad is stopped */
+ pad->block_idle_cb = idle_cb;
+ GST_OBJECT_UNLOCK (pad);
} else {
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad");
- GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
- GST_PAD_BLOCK_BROADCAST (pad);
+ /* the pad is idle now, we can signal the idle callback now */
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle");
+ pad->block_idle_cb = NULL;
GST_OBJECT_UNLOCK (pad);
- if (callback) {
- callback (pad, FALSE, user_data);
-
- if (destroy_data)
- destroy_data (user_data);
+ if (idle_cb) {
+ idle_cb (pad, user_data);
}
}
@@ -1180,8 +1151,7 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked,
had_right_state:
{
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "pad was in right state (%d)", was_blocked);
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was blocked");
GST_OBJECT_UNLOCK (pad);
if (destroy_data)
@@ -1192,6 +1162,49 @@ had_right_state:
}
/**
+ * gst_pad_unblock:
+ * @pad: the #GstPad to unblock
+ *
+ * Unblock @pad. All pending and current pad blocks, if any, are canceled. After
+ * this method, dataflow will continue on @pad.
+ *
+ * MT safe.
+ */
+void
+gst_pad_unblock (GstPad * pad)
+{
+ g_return_if_fail (GST_IS_PAD (pad));
+
+ GST_OBJECT_LOCK (pad);
+
+ if (G_UNLIKELY (!GST_PAD_IS_BLOCKED (pad)))
+ goto had_right_state;
+
+ /* cleanup */
+ if (pad->block_destroy_data && pad->block_data)
+ pad->block_destroy_data (pad->block_data);
+
+ pad->block_idle_cb = NULL;
+ pad->block_cb = NULL;
+ pad->block_data = NULL;
+ pad->block_destroy_data = NULL;
+
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad");
+ GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
+ GST_PAD_BLOCK_BROADCAST (pad);
+ GST_OBJECT_UNLOCK (pad);
+
+ return;
+
+had_right_state:
+ {
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was unblocked");
+ GST_OBJECT_UNLOCK (pad);
+ return;
+ }
+}
+
+/**
* gst_pad_is_blocked:
* @pad: the #GstPad to query
*
@@ -3836,50 +3849,62 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list)
static GstFlowReturn
pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data)
{
- gboolean need_probes, do_probes = TRUE;
+ gboolean need_probes, do_probes = TRUE, do_callback;
again:
GST_OBJECT_LOCK (pad);
+ do_callback = TRUE;
do {
+ GstPadBlockCallback callback;
+ gpointer user_data;
+
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
- need_probes = do_probes && (GST_PAD_DO_BUFFER_SIGNALS (pad) > 0);
+ if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
+ break;
- /* we emit signals on the pad arg, the peer will have a chance to
- * emit in the _chain() function */
- if (G_UNLIKELY (need_probes)) {
- /* don't do probes next time */
- do_probes = FALSE;
- /* unlock before emitting */
+ callback = pad->block_cb;
+ user_data = pad->block_data;
+ if (do_callback && callback) {
+ do_callback = FALSE;
GST_OBJECT_UNLOCK (pad);
- /* if the signal handler returned FALSE, it means we should just drop the
- * buffer */
- /* FIXME, we need more return values so that we can influence the pad
- * block below and let it temporarily unblock for this buffer */
- if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data)))
- goto dropped;
+ callback (pad, GST_MINI_OBJECT_CAST (data), user_data);
- /* we released the lock, recheck everything */
- goto again;
+ /* we released the log, recheck */
+ GST_OBJECT_LOCK (pad);
+ } else {
+ /* now we block the streaming thread. It can be unlocked when we
+ * deactivate the pad (which will also set the FLUSHING flag) or
+ * when the pad is unblocked. A flushing event will also unblock
+ * the pad after setting the FLUSHING flag. */
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "Waiting to be unblocked or set flushing");
+ GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
+ GST_PAD_BLOCK_WAIT (pad);
+ GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
}
+ } while (TRUE);
- /* when we get here, the item is not dropped by the probe, if we are
- * blocking, we now need to wait until unblocked */
- if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
- break;
+ need_probes = do_probes && (GST_PAD_DO_BUFFER_SIGNALS (pad) > 0);
- /* now we block the streaming thread. It can be unlocked when we
- * deactivate the pad (which will also set the FLUSHING flag) or
- * when the pad is unblocked. A flushing event will also unblock
- * the pad after setting the FLUSHING flag. */
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "Waiting to be unblocked or set flushing");
- GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
- GST_PAD_BLOCK_WAIT (pad);
- GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
- } while (TRUE);
+ /* we emit signals on the pad arg, the peer will have a chance to
+ * emit in the _chain() function */
+ if (G_UNLIKELY (need_probes)) {
+ /* don't do probes the next time */
+ do_probes = FALSE;
+ /* unlock before emitting */
+ GST_OBJECT_UNLOCK (pad);
+
+ /* if the signal handler returned FALSE, it means we should just drop the
+ * buffer */
+ if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data)))
+ goto dropped;
+
+ /* we released the lock, recheck everything */
+ goto again;
+ }
if (G_UNLIKELY ((*peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
@@ -3920,16 +3945,16 @@ pad_post_push (GstPad * pad)
if (pad->priv->using == 0) {
/* pad is not active anymore, check if we need to trigger the block */
if (GST_PAD_IS_BLOCKED (pad)) {
- GstPadBlockCallback callback;
+ GstPadIdleCallback callback;
gpointer user_data;
- callback = pad->block_callback;
+ callback = pad->block_idle_cb;
user_data = pad->block_data;
GST_PAD_BLOCK_BROADCAST (pad);
GST_OBJECT_UNLOCK (pad);
if (callback)
- callback (pad, TRUE, user_data);
+ callback (pad, user_data);
return;
}
@@ -4306,7 +4331,8 @@ gboolean
gst_pad_push_event (GstPad * pad, GstEvent * event)
{
GstPad *peerpad;
- gboolean result, need_probes, do_probes = TRUE, do_event_actions = TRUE;
+ gboolean result, need_probes, do_probes = TRUE, do_event_actions =
+ TRUE, do_callback;
gboolean stored = FALSE;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
@@ -4415,46 +4441,60 @@ again:
}
}
+ do_callback = TRUE;
do {
- /* we emit signals on the pad arg, the peer will have a chance to
- * emit in the _chain() function */
- if (G_UNLIKELY (need_probes)) {
- /* don't do probes next time */
- do_probes = FALSE;
- /* unlock before emitting */
- GST_OBJECT_UNLOCK (pad);
+ GstPadBlockCallback callback;
+ gpointer user_data;
- /* if the signal handler returned FALSE, it means we should just drop the
- * buffer */
- /* FIXME, we need more return values so that we can influence the pad
- * block below and let it temporarily unblock for this buffer */
- if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event)))
- goto dropped;
-
- /* we released the lock, recheck everything */
- goto again;
- }
+ if (GST_PAD_IS_FLUSHING (pad))
+ goto flushed;
- /* when we get here, the item is not dropped by the probe, if we are
- * blocking, we now need to wait until unblocked */
+ /* if we are blocking, we now need to wait until unblocked */
if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
break;
- /* now we block the streaming thread. It can be unlocked when we
- * deactivate the pad (which will also set the FLUSHING flag) or
- * when the pad is unblocked. A flushing event will also unblock
- * the pad after setting the FLUSHING flag. */
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "Waiting to be unblocked or set flushing");
- GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
- GST_PAD_BLOCK_WAIT (pad);
- GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
+ callback = pad->block_cb;
+ user_data = pad->block_data;
+ if (do_callback && callback) {
+ do_callback = FALSE;
+ GST_OBJECT_UNLOCK (pad);
- if (GST_PAD_IS_FLUSHING (pad))
- goto flushed;
+ callback (pad, GST_MINI_OBJECT_CAST (event), user_data);
+ /* we released the log, recheck */
+ GST_OBJECT_LOCK (pad);
+ } else {
+ /* now we block the streaming thread. It can be unlocked when we
+ * deactivate the pad (which will also set the FLUSHING flag) or
+ * when the pad is unblocked. A flushing event will also unblock
+ * the pad after setting the FLUSHING flag. */
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "Waiting to be unblocked or set flushing");
+ GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
+ GST_PAD_BLOCK_WAIT (pad);
+ GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
+ }
} while (TRUE);
+ /* we emit signals on the pad arg, the peer will have a chance to
+ * emit in the _chain() function */
+ if (G_UNLIKELY (need_probes)) {
+ /* don't do probes next time */
+ do_probes = FALSE;
+ /* unlock before emitting */
+ GST_OBJECT_UNLOCK (pad);
+
+ /* if the signal handler returned FALSE, it means we should just drop the
+ * buffer */
+ /* FIXME, we need more return values so that we can influence the pad
+ * block below and let it temporarily unblock for this buffer */
+ if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event)))
+ goto dropped;
+
+ /* we released the lock, recheck everything */
+ goto again;
+ }
+
/* now check the peer pad */
if (peerpad == NULL)
goto not_linked;
diff --git a/gst/gstpad.h b/gst/gstpad.h
index 970e1ef51..5fad9ba71 100644
--- a/gst/gstpad.h
+++ b/gst/gstpad.h
@@ -471,15 +471,26 @@ typedef void (*GstPadFixateCapsFunction) (GstPad *pad, GstCaps *caps);
typedef gboolean (*GstPadDispatcherFunction) (GstPad *pad, gpointer data);
/**
+ * GstPadIdleCallback:
+ * @pad: the #GstPad that is idle
+ * @user_data: the gpointer to optional user data.
+ *
+ * Callback used by gst_pad_set_blocked(). Gets called when no data is flowing
+ * over the pad. This callback can be called either from the streaming thread or
+ * the thread that blocked the pad.
+ */
+typedef void (*GstPadIdleCallback) (GstPad *pad, gpointer user_data);
+
+/**
* GstPadBlockCallback:
* @pad: the #GstPad that is blockend or unblocked.
- * @blocked: blocking state for the pad
+ * @object: the object that is blocked
* @user_data: the gpointer to optional user data.
*
- * Callback used by gst_pad_set_blocked_async(). Gets called when the blocking
+ * Callback used by gst_pad_set_blocked(). Gets called when the blocking
* operation succeeds.
*/
-typedef void (*GstPadBlockCallback) (GstPad *pad, gboolean blocked, gpointer user_data);
+typedef void (*GstPadBlockCallback) (GstPad *pad, GstMiniObject *object, gpointer user_data);
/**
* GstPadStickyEventsForeachFunction:
@@ -600,7 +611,8 @@ struct _GstPad {
/*< public >*/ /* with LOCK */
/* block cond, mutex is from the object */
GCond *block_cond;
- GstPadBlockCallback block_callback;
+ GstPadIdleCallback block_idle_cb;
+ GstPadBlockCallback block_cb;
gpointer block_data;
GDestroyNotify block_destroy_data;
gboolean block_callback_called;
@@ -804,9 +816,13 @@ gboolean gst_pad_is_active (GstPad *pad);
gboolean gst_pad_activate_pull (GstPad *pad, gboolean active);
gboolean gst_pad_activate_push (GstPad *pad, gboolean active);
-gboolean gst_pad_set_blocked (GstPad *pad, gboolean blocked,
- GstPadBlockCallback callback, gpointer user_data,
+gboolean gst_pad_block (GstPad *pad,
+ GstPadIdleCallback idle_cb,
+ GstPadBlockCallback block_cb,
+ gpointer user_data,
GDestroyNotify destroy_data);
+void gst_pad_unblock (GstPad *pad);
+
gboolean gst_pad_is_blocked (GstPad *pad);
gboolean gst_pad_is_blocking (GstPad *pad);