diff options
author | Wim Taymans <wim.taymans@collabora.co.uk> | 2011-05-30 13:37:23 +0200 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2011-05-30 13:37:23 +0200 |
commit | 058d0b6791ed38f1584481bc5bc103b92a1abff9 (patch) | |
tree | d14f9a018e3a99a41a40796788af9bd564ffd6fd | |
parent | 9f2424123f141990b63962aaae9f88db9bd8e3e9 (diff) |
pad: rework pad block some morerework-pad-block2
-rw-r--r-- | gst/gstpad.c | 308 | ||||
-rw-r--r-- | gst/gstpad.h | 28 |
2 files changed, 196 insertions, 140 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c index 4678a538d..a8e1c1e94 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -1069,24 +1069,27 @@ gst_pad_is_active (GstPad * pad) } /** - * gst_pad_set_blocked: - * @pad: the #GstPad to block or unblock - * @blocked: boolean indicating whether the pad should be blocked or unblocked - * @callback: #GstPadBlockCallback that will be called when the + * gst_pad_block: + * @pad: the #GstPad to block + * @ide_cb: #GstPadIdleCallback that will be called when the + * pad becomes idle + * @block_cb: #GstPadBlockCallback that will be called when the * operation succeeds - * @user_data: (closure): user data passed to the callback + * @user_data: (closure): user data passed to the callbacks * @destroy_data: #GDestroyNotify for user_data * - * Blocks or unblocks the dataflow on a pad. The provided callback - * is called when the operation succeeds; this happens right before the next - * attempt at pushing a buffer on the pad. + * Blocks the dataflow on a pad. + * + * First @idle_cb is called when no dataflow is happening on @pad. This can + * happen from the thread that calls gst_pad_block() when the pad is currently + * idle or from the streaming thread when the pad was busy when this method is + * called. + * + * Then block_cb is called when new data is pushed on a pad and is blocked. This + * can take a while as there needs to be dataflow on the pad. * - * This can take a while as the pad can only become blocked when real dataflow - * is happening. * When the pipeline is stalled, for example in PAUSED, this can * take an indeterminate amount of time. - * You can pass NULL as the callback to make this call block. Be careful with - * this blocking call as it might not return for reasons stated above. * * <note> * Pad block handlers are only called for source pads in push mode @@ -1099,80 +1102,48 @@ gst_pad_is_active (GstPad * pad) * MT safe. */ gboolean -gst_pad_set_blocked (GstPad * pad, gboolean blocked, - GstPadBlockCallback callback, gpointer user_data, +gst_pad_block (GstPad * pad, GstPadIdleCallback idle_cb, + GstPadBlockCallback block_cb, gpointer user_data, GDestroyNotify destroy_data) { - gboolean was_blocked = FALSE; - g_return_val_if_fail (GST_IS_PAD (pad), FALSE); GST_OBJECT_LOCK (pad); - was_blocked = GST_PAD_IS_BLOCKED (pad); - - if (G_UNLIKELY (was_blocked == blocked)) + if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) goto had_right_state; if (pad->block_destroy_data && pad->block_data) pad->block_destroy_data (pad->block_data); - pad->block_callback = NULL; - pad->block_data = NULL; - pad->block_destroy_data = NULL; + /* we always need to save the block_cb and user data */ + pad->block_cb = block_cb; + pad->block_data = user_data; + pad->block_destroy_data = destroy_data; - if (blocked) { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad"); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad"); - /* set the blocking flag, future push operations will block */ - GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED); + /* set the blocking flag, future push operations will block */ + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED); - if (pad->block_destroy_data && pad->block_data) - pad->block_destroy_data (pad->block_data); + if (pad->priv->using > 0) { + /* the pad is in use, we can't signal the callback yet. Since we set the + * flag above, the last thread to leave the push will do the callback. New + * threads going into the push will block. */ + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use"); - if (pad->priv->using > 0) { - /* the pad is in use, we can't signal the callback yet. Since we set the - * flag above, the last thread to leave the push will do the callback. New - * threads going into the push will block. */ - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use"); - - if (callback == NULL) { - /* backwards compat, if there is no callback, this method should wait - * until the pad is blocked. */ - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block"); - GST_PAD_BLOCK_WAIT (pad); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked"); - } else { - /* else save the callback, we will signal from the streaming thread when - * the last thread using the pad is stopped */ - pad->block_callback = callback; - pad->block_data = user_data; - pad->block_destroy_data = destroy_data; - } - GST_OBJECT_UNLOCK (pad); - } else { - /* the pad is idle now, we can signal the callback now */ - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle"); - GST_OBJECT_UNLOCK (pad); - - if (callback) { - callback (pad, TRUE, user_data); - - if (destroy_data) - destroy_data (user_data); - } - } + /* else save the callback, we will signal from the streaming thread when + * the last thread using the pad is stopped */ + pad->block_idle_cb = idle_cb; + GST_OBJECT_UNLOCK (pad); } else { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad"); - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED); - GST_PAD_BLOCK_BROADCAST (pad); + /* the pad is idle now, we can signal the idle callback now */ + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle"); + pad->block_idle_cb = NULL; GST_OBJECT_UNLOCK (pad); - if (callback) { - callback (pad, FALSE, user_data); - - if (destroy_data) - destroy_data (user_data); + if (idle_cb) { + idle_cb (pad, user_data); } } @@ -1180,8 +1151,7 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked, had_right_state: { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pad was in right state (%d)", was_blocked); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was blocked"); GST_OBJECT_UNLOCK (pad); if (destroy_data) @@ -1192,6 +1162,49 @@ had_right_state: } /** + * gst_pad_unblock: + * @pad: the #GstPad to unblock + * + * Unblock @pad. All pending and current pad blocks, if any, are canceled. After + * this method, dataflow will continue on @pad. + * + * MT safe. + */ +void +gst_pad_unblock (GstPad * pad) +{ + g_return_if_fail (GST_IS_PAD (pad)); + + GST_OBJECT_LOCK (pad); + + if (G_UNLIKELY (!GST_PAD_IS_BLOCKED (pad))) + goto had_right_state; + + /* cleanup */ + if (pad->block_destroy_data && pad->block_data) + pad->block_destroy_data (pad->block_data); + + pad->block_idle_cb = NULL; + pad->block_cb = NULL; + pad->block_data = NULL; + pad->block_destroy_data = NULL; + + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad"); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED); + GST_PAD_BLOCK_BROADCAST (pad); + GST_OBJECT_UNLOCK (pad); + + return; + +had_right_state: + { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was unblocked"); + GST_OBJECT_UNLOCK (pad); + return; + } +} + +/** * gst_pad_is_blocked: * @pad: the #GstPad to query * @@ -3836,50 +3849,62 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list) static GstFlowReturn pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data) { - gboolean need_probes, do_probes = TRUE; + gboolean need_probes, do_probes = TRUE, do_callback; again: GST_OBJECT_LOCK (pad); + do_callback = TRUE; do { + GstPadBlockCallback callback; + gpointer user_data; + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; - need_probes = do_probes && (GST_PAD_DO_BUFFER_SIGNALS (pad) > 0); + if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad))) + break; - /* we emit signals on the pad arg, the peer will have a chance to - * emit in the _chain() function */ - if (G_UNLIKELY (need_probes)) { - /* don't do probes next time */ - do_probes = FALSE; - /* unlock before emitting */ + callback = pad->block_cb; + user_data = pad->block_data; + if (do_callback && callback) { + do_callback = FALSE; GST_OBJECT_UNLOCK (pad); - /* if the signal handler returned FALSE, it means we should just drop the - * buffer */ - /* FIXME, we need more return values so that we can influence the pad - * block below and let it temporarily unblock for this buffer */ - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data))) - goto dropped; + callback (pad, GST_MINI_OBJECT_CAST (data), user_data); - /* we released the lock, recheck everything */ - goto again; + /* we released the log, recheck */ + GST_OBJECT_LOCK (pad); + } else { + /* now we block the streaming thread. It can be unlocked when we + * deactivate the pad (which will also set the FLUSHING flag) or + * when the pad is unblocked. A flushing event will also unblock + * the pad after setting the FLUSHING flag. */ + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "Waiting to be unblocked or set flushing"); + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING); + GST_PAD_BLOCK_WAIT (pad); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING); } + } while (TRUE); - /* when we get here, the item is not dropped by the probe, if we are - * blocking, we now need to wait until unblocked */ - if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad))) - break; + need_probes = do_probes && (GST_PAD_DO_BUFFER_SIGNALS (pad) > 0); - /* now we block the streaming thread. It can be unlocked when we - * deactivate the pad (which will also set the FLUSHING flag) or - * when the pad is unblocked. A flushing event will also unblock - * the pad after setting the FLUSHING flag. */ - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "Waiting to be unblocked or set flushing"); - GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING); - GST_PAD_BLOCK_WAIT (pad); - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING); - } while (TRUE); + /* we emit signals on the pad arg, the peer will have a chance to + * emit in the _chain() function */ + if (G_UNLIKELY (need_probes)) { + /* don't do probes the next time */ + do_probes = FALSE; + /* unlock before emitting */ + GST_OBJECT_UNLOCK (pad); + + /* if the signal handler returned FALSE, it means we should just drop the + * buffer */ + if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data))) + goto dropped; + + /* we released the lock, recheck everything */ + goto again; + } if (G_UNLIKELY ((*peer = GST_PAD_PEER (pad)) == NULL)) goto not_linked; @@ -3920,16 +3945,16 @@ pad_post_push (GstPad * pad) if (pad->priv->using == 0) { /* pad is not active anymore, check if we need to trigger the block */ if (GST_PAD_IS_BLOCKED (pad)) { - GstPadBlockCallback callback; + GstPadIdleCallback callback; gpointer user_data; - callback = pad->block_callback; + callback = pad->block_idle_cb; user_data = pad->block_data; GST_PAD_BLOCK_BROADCAST (pad); GST_OBJECT_UNLOCK (pad); if (callback) - callback (pad, TRUE, user_data); + callback (pad, user_data); return; } @@ -4306,7 +4331,8 @@ gboolean gst_pad_push_event (GstPad * pad, GstEvent * event) { GstPad *peerpad; - gboolean result, need_probes, do_probes = TRUE, do_event_actions = TRUE; + gboolean result, need_probes, do_probes = TRUE, do_event_actions = + TRUE, do_callback; gboolean stored = FALSE; g_return_val_if_fail (GST_IS_PAD (pad), FALSE); @@ -4415,46 +4441,60 @@ again: } } + do_callback = TRUE; do { - /* we emit signals on the pad arg, the peer will have a chance to - * emit in the _chain() function */ - if (G_UNLIKELY (need_probes)) { - /* don't do probes next time */ - do_probes = FALSE; - /* unlock before emitting */ - GST_OBJECT_UNLOCK (pad); + GstPadBlockCallback callback; + gpointer user_data; - /* if the signal handler returned FALSE, it means we should just drop the - * buffer */ - /* FIXME, we need more return values so that we can influence the pad - * block below and let it temporarily unblock for this buffer */ - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event))) - goto dropped; - - /* we released the lock, recheck everything */ - goto again; - } + if (GST_PAD_IS_FLUSHING (pad)) + goto flushed; - /* when we get here, the item is not dropped by the probe, if we are - * blocking, we now need to wait until unblocked */ + /* if we are blocking, we now need to wait until unblocked */ if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad))) break; - /* now we block the streaming thread. It can be unlocked when we - * deactivate the pad (which will also set the FLUSHING flag) or - * when the pad is unblocked. A flushing event will also unblock - * the pad after setting the FLUSHING flag. */ - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "Waiting to be unblocked or set flushing"); - GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING); - GST_PAD_BLOCK_WAIT (pad); - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING); + callback = pad->block_cb; + user_data = pad->block_data; + if (do_callback && callback) { + do_callback = FALSE; + GST_OBJECT_UNLOCK (pad); - if (GST_PAD_IS_FLUSHING (pad)) - goto flushed; + callback (pad, GST_MINI_OBJECT_CAST (event), user_data); + /* we released the log, recheck */ + GST_OBJECT_LOCK (pad); + } else { + /* now we block the streaming thread. It can be unlocked when we + * deactivate the pad (which will also set the FLUSHING flag) or + * when the pad is unblocked. A flushing event will also unblock + * the pad after setting the FLUSHING flag. */ + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "Waiting to be unblocked or set flushing"); + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING); + GST_PAD_BLOCK_WAIT (pad); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING); + } } while (TRUE); + /* we emit signals on the pad arg, the peer will have a chance to + * emit in the _chain() function */ + if (G_UNLIKELY (need_probes)) { + /* don't do probes next time */ + do_probes = FALSE; + /* unlock before emitting */ + GST_OBJECT_UNLOCK (pad); + + /* if the signal handler returned FALSE, it means we should just drop the + * buffer */ + /* FIXME, we need more return values so that we can influence the pad + * block below and let it temporarily unblock for this buffer */ + if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event))) + goto dropped; + + /* we released the lock, recheck everything */ + goto again; + } + /* now check the peer pad */ if (peerpad == NULL) goto not_linked; diff --git a/gst/gstpad.h b/gst/gstpad.h index 970e1ef51..5fad9ba71 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -471,15 +471,26 @@ typedef void (*GstPadFixateCapsFunction) (GstPad *pad, GstCaps *caps); typedef gboolean (*GstPadDispatcherFunction) (GstPad *pad, gpointer data); /** + * GstPadIdleCallback: + * @pad: the #GstPad that is idle + * @user_data: the gpointer to optional user data. + * + * Callback used by gst_pad_set_blocked(). Gets called when no data is flowing + * over the pad. This callback can be called either from the streaming thread or + * the thread that blocked the pad. + */ +typedef void (*GstPadIdleCallback) (GstPad *pad, gpointer user_data); + +/** * GstPadBlockCallback: * @pad: the #GstPad that is blockend or unblocked. - * @blocked: blocking state for the pad + * @object: the object that is blocked * @user_data: the gpointer to optional user data. * - * Callback used by gst_pad_set_blocked_async(). Gets called when the blocking + * Callback used by gst_pad_set_blocked(). Gets called when the blocking * operation succeeds. */ -typedef void (*GstPadBlockCallback) (GstPad *pad, gboolean blocked, gpointer user_data); +typedef void (*GstPadBlockCallback) (GstPad *pad, GstMiniObject *object, gpointer user_data); /** * GstPadStickyEventsForeachFunction: @@ -600,7 +611,8 @@ struct _GstPad { /*< public >*/ /* with LOCK */ /* block cond, mutex is from the object */ GCond *block_cond; - GstPadBlockCallback block_callback; + GstPadIdleCallback block_idle_cb; + GstPadBlockCallback block_cb; gpointer block_data; GDestroyNotify block_destroy_data; gboolean block_callback_called; @@ -804,9 +816,13 @@ gboolean gst_pad_is_active (GstPad *pad); gboolean gst_pad_activate_pull (GstPad *pad, gboolean active); gboolean gst_pad_activate_push (GstPad *pad, gboolean active); -gboolean gst_pad_set_blocked (GstPad *pad, gboolean blocked, - GstPadBlockCallback callback, gpointer user_data, +gboolean gst_pad_block (GstPad *pad, + GstPadIdleCallback idle_cb, + GstPadBlockCallback block_cb, + gpointer user_data, GDestroyNotify destroy_data); +void gst_pad_unblock (GstPad *pad); + gboolean gst_pad_is_blocked (GstPad *pad); gboolean gst_pad_is_blocking (GstPad *pad); |