diff options
author | Sebastian Dröge <sebastian.droege@collabora.co.uk> | 2009-09-26 12:17:49 +0200 |
---|---|---|
committer | Sebastian Dröge <sebastian.droege@collabora.co.uk> | 2009-10-07 17:46:28 +0200 |
commit | cf9c6a2271ad35e422c8fba6af842e3ea309d82d (patch) | |
tree | 7817c9cd294ef8aa80e4f2c989c2bb45f6beca09 | |
parent | 1e004cd36358af2f5323f69edfe40bfe96f602a2 (diff) |
decodebin2: Rewrite autoplugging and how groups of pads are exposed
This now keeps track of everything that is going on, creates
a tree of chains and groups to allow "demuxer after demuxer" scenarios
and allows chained Oggs with multiple streams (needs oggdemux or playbin2 fixes).
Also document everything in detail and give a general overview of what
decodebin2 is doing at the top of the sources.
Fixes bug #596183, #563828 and #591677.
-rw-r--r-- | gst/playback/gstdecodebin2.c | 1782 |
1 files changed, 946 insertions, 836 deletions
diff --git a/gst/playback/gstdecodebin2.c b/gst/playback/gstdecodebin2.c index 24e251bb2..1d6c9d258 100644 --- a/gst/playback/gstdecodebin2.c +++ b/gst/playback/gstdecodebin2.c @@ -1,5 +1,6 @@ /* GStreamer * Copyright (C) <2006> Edward Hervey <edward@fluendo.com> + * Copyright (C) <2009> Sebastian Dröge <sebastian.droege@collabora.co.uk> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -31,6 +32,51 @@ * decodebin2 instead of the older #GstDecodeBin for its internal auto-plugging. */ +/* Implementation notes: + * + * The following section describes how decodebin2 works internally. + * + * The first part of decodebin2 is it's typefind element, which tries + * to get the type of the input stream. If the type is found autoplugging starts. + * + * decodebin2 internally organizes the elements it autoplugged into GstDecodeChains + * and GstDecodeGroups. A decode chain is a single chain of decoding, this + * means that if decodebin2 every autoplugs an element with two+ srcpads + * (e.g. a demuxer) this will end the chain and everything following this + * demuxer will be put into decode groups below the chain. Otherwise, + * if an element has a single srcpad that outputs raw data the decode chain + * is ended too and a GstDecodePad is stored and blocked. + * + * A decode group combines a number of chains that are created by a + * demuxer element. All those chains are connected through a multiqueue to + * the demuxer. A new group for the same demuxer is only created if the + * demuxer has signaled no-more pads, in which case all following pads + * create a new chain in the new group. + * + * This continues until the top-level decode chain is complete. A decode + * chain is complete if it either ends with a blocked endpad, if autoplugging + * stopped because no suitable plugins could be found or if the active group + * is complete. A decode group OTOH is complete if all child chains are complete. + * + * If this happens at some point, all endpads of all active groups are exposed. + * For this decodebin2 adds the endpads, signals no-more-pads and then unblocks + * them. Now playback starts. + * + * If one of the chains that end on a endpad receives EOS decodebin2 checks upwards + * via the parent pointers if all chains and groups are drained. In that case + * everything goes into EOS. + * If there is a chain where the active group is drained but there exist next groups + * the active group is hidden (endpads are removed) and the next group is exposed. + * + * Note 1: If we're talking about blocked endpads this really means that the + * *target* pads of the endpads are blocked. Pads that are exposed to the outside + * should never ever be blocked! + * + * Note 2: If a group is complete and the parent's chain demuxer adds new pads + * but never signaled no-more-pads this additional pads will be ignored! + * + */ + #ifdef HAVE_CONFIG_H #include "config.h" #endif @@ -61,6 +107,7 @@ GST_STATIC_PAD_TEMPLATE ("src%d", GST_DEBUG_CATEGORY_STATIC (gst_decode_bin_debug); #define GST_CAT_DEFAULT gst_decode_bin_debug +typedef struct _GstDecodeChain GstDecodeChain; typedef struct _GstDecodeGroup GstDecodeGroup; typedef struct _GstDecodePad GstDecodePad; typedef GstGhostPadClass GstDecodePadClass; @@ -90,16 +137,14 @@ struct _GstDecodeBin GstElement *typefind; /* this holds the typefind object */ - GMutex *lock; /* Protects activegroup and groups */ - GstDecodeGroup *activegroup; /* group currently active */ - GList *groups; /* List of non-active GstDecodeGroups, sorted in - * order of creation. */ - GList *oldgroups; /* List of no-longer-used GstDecodeGroups. - * Should be freed in dispose */ + GMutex *expose_lock; /* Protects exposal and removal of groups */ + GstDecodeChain *decode_chain; /* Top level decode chain */ gint nbpads; /* unique identifier for source pads */ GValueArray *factories; /* factories we can use for selecting elements */ - GList *subtitles; /* List of elements with subtitle-encoding */ + + GList *subtitles; /* List of elements with subtitle-encoding, + * protected by object lock! */ gboolean have_type; /* if we received the have_type signal */ guint have_type_id; /* signal id for have-type from typefind */ @@ -170,8 +215,8 @@ static const GstElementDetails gst_decode_bin_details = GST_ELEMENT_DETAILS ("Decoder Bin", "Generic/Bin/Decoder", "Autoplug and decode to raw media", - "Edward Hervey <edward@fluendo.com>"); - + "Edward Hervey <edward.hervey@collabora.co.uk>, " + "Sebastian Dröge <sebastian.droege@collabora.co.uk>"); static void do_async_start (GstDecodeBin * dbin); static void do_async_done (GstDecodeBin * dbin); @@ -194,33 +239,31 @@ static void gst_decode_bin_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_decode_bin_set_caps (GstDecodeBin * dbin, GstCaps * caps); static GstCaps *gst_decode_bin_get_caps (GstDecodeBin * dbin); -static void caps_notify_group_cb (GstPad * pad, GParamSpec * unused, - GstDecodeGroup * group); static void caps_notify_cb (GstPad * pad, GParamSpec * unused, - GstDecodeBin * dbin); + GstDecodeChain * chain); static GstPad *find_sink_pad (GstElement * element); static GstStateChangeReturn gst_decode_bin_change_state (GstElement * element, GstStateChange transition); -#define DECODE_BIN_LOCK(dbin) G_STMT_START { \ +#define EXPOSE_LOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ - "locking from thread %p", \ + "expose locking from thread %p", \ g_thread_self ()); \ - g_mutex_lock (GST_DECODE_BIN_CAST(dbin)->lock); \ + g_mutex_lock (GST_DECODE_BIN_CAST(dbin)->expose_lock); \ GST_LOG_OBJECT (dbin, \ - "locked from thread %p", \ + "expose locked from thread %p", \ g_thread_self ()); \ } G_STMT_END -#define DECODE_BIN_UNLOCK(dbin) G_STMT_START { \ +#define EXPOSE_UNLOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ - "unlocking from thread %p", \ + "expose unlocking from thread %p", \ g_thread_self ()); \ - g_mutex_unlock (GST_DECODE_BIN_CAST(dbin)->lock); \ + g_mutex_unlock (GST_DECODE_BIN_CAST(dbin)->expose_lock); \ } G_STMT_END -#define DECODE_BIN_DYN_LOCK(dbin) G_STMT_START { \ +#define DYN_LOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ "dynlocking from thread %p", \ g_thread_self ()); \ @@ -230,7 +273,7 @@ static GstStateChangeReturn gst_decode_bin_change_state (GstElement * element, g_thread_self ()); \ } G_STMT_END -#define DECODE_BIN_DYN_UNLOCK(dbin) G_STMT_START { \ +#define DYN_UNLOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ "dynunlocking from thread %p", \ g_thread_self ()); \ @@ -241,68 +284,109 @@ static GstStateChangeReturn gst_decode_bin_change_state (GstElement * element, * * Streams belonging to the same group/chain of a media file * + * When changing something here lock the parent chain! */ struct _GstDecodeGroup { GstDecodeBin *dbin; - GMutex *lock; - GstElement *multiqueue; - - gboolean exposed; /* TRUE if this group is exposed */ - gboolean drained; /* TRUE if EOS went through all endpads */ - gboolean blocked; /* TRUE if all endpads are blocked */ - gboolean complete; /* TRUE if we are not expecting anymore streams - * on this group */ + GstDecodeChain *parent; + + GstElement *multiqueue; /* Used for linking all child chains */ gulong overrunsig; /* the overrun signal for multiqueue */ - guint nbdynamic; /* number of dynamic pads in the group. */ - GList *endpads; /* List of GstDecodePad of source pads to be exposed */ - GList *reqpads; /* List of RequestPads for multiqueue. */ + gboolean overrun; /* TRUE if the multiqueue signaled overrun. This + * means that we should really expose the group */ + + gboolean no_more_pads; /* TRUE if the demuxer signaled no-more-pads */ + gboolean drained; /* TRUE if the all children are drained */ + + GList *children; /* List of GstDecodeChains in this group */ + + GList *reqpads; /* List of RequestPads for multiqueue, there is + * exactly one RequestPad per child chain */ }; -#define GROUP_MUTEX_LOCK(group) G_STMT_START { \ - GST_LOG_OBJECT (group->dbin, \ - "locking group %p from thread %p", \ - group, g_thread_self ()); \ - g_mutex_lock (group->lock); \ - GST_LOG_OBJECT (group->dbin, \ - "locked group %p from thread %p", \ - group, g_thread_self ()); \ -} G_STMT_END +struct _GstDecodeChain +{ + GstDecodeGroup *parent; + GstDecodeBin *dbin; -#define GROUP_MUTEX_UNLOCK(group) G_STMT_START { \ - GST_LOG_OBJECT (group->dbin, \ - "unlocking group %p from thread %p", \ - group, g_thread_self ()); \ - g_mutex_unlock (group->lock); \ -} G_STMT_END + GMutex *lock; /* Protects this chain and its groups */ + gboolean demuxer; /* TRUE if elements->data is a demuxer */ + GList *elements; /* All elements in this group, first + is the latest and most downstream element */ -static GstDecodeGroup *gst_decode_group_new (GstDecodeBin * decode_bin, - gboolean use_queue); -static GstPad *gst_decode_group_control_demuxer_pad (GstDecodeGroup * group, - GstPad * pad); -static gboolean gst_decode_group_control_source_pad (GstDecodeGroup * group, - GstDecodePad * pad); -static gboolean gst_decode_group_expose (GstDecodeGroup * group); -static gboolean gst_decode_group_check_if_blocked (GstDecodeGroup * group); -static void gst_decode_group_set_complete (GstDecodeGroup * group); + /* Note: there are only groups if the last element of this chain + * is a demuxer, otherwise the chain will end with an endpad. + * The other way around this means, that endpad only exists if this + * chain doesn't end with a demuxer! */ + + GstDecodeGroup *active_group; /* Currently active group */ + GList *next_groups; /* head is newest group, tail is next group. + a new group will be created only if the head + group had no-more-pads. If it's only exposed + all new pads will be ignored! */ + GList *pending_pads; /* Pads that have no fixed caps yet */ + + GstDecodePad *endpad; /* Pad of this chain that could be exposed */ + gboolean deadend; /* This chain is incomplete and can't be completed, + e.g. no suitable decoder could be found + */ + + /* FIXME: This should be done directly via a thread! */ + GList *old_groups; /* Groups that should be freed later */ +}; + +static void gst_decode_chain_free (GstDecodeChain * chain); +static GstDecodeChain *gst_decode_chain_new (GstDecodeBin * dbin, + GstDecodeGroup * group); static void gst_decode_group_hide (GstDecodeGroup * group); static void gst_decode_group_free (GstDecodeGroup * group); +static GstDecodeGroup *gst_decode_group_new (GstDecodeBin * dbin, + GstDecodeChain * chain); +static gboolean gst_decode_chain_is_complete (GstDecodeChain * chain); +static void gst_decode_chain_handle_eos (GstDecodeChain * chain); +static gboolean gst_decode_chain_expose (GstDecodeChain * chain, + GList ** endpads); +static gboolean gst_decode_chain_is_drained (GstDecodeChain * chain); +static gboolean gst_decode_group_is_complete (GstDecodeGroup * group); +static GstPad *gst_decode_group_control_demuxer_pad (GstDecodeGroup * group, + GstPad * pad); +static gboolean gst_decode_group_is_drained (GstDecodeGroup * group); + +static gboolean gst_decode_bin_expose (GstDecodeBin * dbin); + +#define CHAIN_MUTEX_LOCK(chain) G_STMT_START { \ + GST_LOG_OBJECT (chain->dbin, \ + "locking chain %p from thread %p", \ + chain, g_thread_self ()); \ + g_mutex_lock (chain->lock); \ + GST_LOG_OBJECT (chain->dbin, \ + "locked chain %p from thread %p", \ + chain, g_thread_self ()); \ +} G_STMT_END + +#define CHAIN_MUTEX_UNLOCK(chain) G_STMT_START { \ + GST_LOG_OBJECT (chain->dbin, \ + "unlocking chain %p from thread %p", \ + chain, g_thread_self ()); \ + g_mutex_unlock (chain->lock); \ +} G_STMT_END /* GstDecodePad * - * GstPad private used for source pads of groups + * GstPad private used for source pads of chains */ struct _GstDecodePad { GstGhostPad parent; GstDecodeBin *dbin; - GstDecodeGroup *group; + GstDecodeChain *chain; - gboolean blocked; /* the pad is blocked */ + gboolean blocked; /* the *target* pad is blocked */ + gboolean exposed; /* the pad is exposed */ gboolean drained; /* an EOS has been seen on the pad */ - gboolean added; /* the pad is added to decodebin */ }; G_DEFINE_TYPE (GstDecodePad, gst_decode_pad, GST_TYPE_GHOST_PAD); @@ -310,10 +394,11 @@ G_DEFINE_TYPE (GstDecodePad, gst_decode_pad, GST_TYPE_GHOST_PAD); #define GST_DECODE_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_DECODE_PAD,GstDecodePad)) static GstDecodePad *gst_decode_pad_new (GstDecodeBin * dbin, GstPad * pad, - GstDecodeGroup * group); + GstDecodeChain * chain); static void gst_decode_pad_activate (GstDecodePad * dpad, - GstDecodeGroup * group); + GstDecodeChain * chain); static void gst_decode_pad_unblock (GstDecodePad * dpad); +static void gst_decode_pad_set_blocked (GstDecodePad * dpad, gboolean blocked); /******************************** * Standard GObject boilerplate * @@ -643,9 +728,8 @@ gst_decode_bin_init (GstDecodeBin * decode_bin) G_CALLBACK (type_found), decode_bin); } - decode_bin->lock = g_mutex_new (); - decode_bin->activegroup = NULL; - decode_bin->groups = NULL; + decode_bin->expose_lock = g_mutex_new (); + decode_bin->decode_chain = NULL; decode_bin->dyn_lock = g_mutex_new (); decode_bin->shutdown = FALSE; @@ -658,82 +742,6 @@ gst_decode_bin_init (GstDecodeBin * decode_bin) } static void -gst_decode_bin_remove_groups (GstDecodeBin * dbin) -{ - GList *tmp; - GstIterator *it; - gpointer point; - gboolean done; - GstIteratorResult res; - - GST_DEBUG_OBJECT (dbin, "cleaning up"); - - if (dbin->activegroup) { - GST_DEBUG_OBJECT (dbin, "free active group %p", dbin->activegroup); - gst_decode_group_free (dbin->activegroup); - dbin->activegroup = NULL; - } - - /* remove groups */ - for (tmp = dbin->groups; tmp; tmp = g_list_next (tmp)) { - GstDecodeGroup *group = (GstDecodeGroup *) tmp->data; - - GST_DEBUG_OBJECT (dbin, "free group %p", group); - gst_decode_group_free (group); - } - g_list_free (dbin->groups); - dbin->groups = NULL; - - for (tmp = dbin->oldgroups; tmp; tmp = g_list_next (tmp)) { - GstDecodeGroup *group = (GstDecodeGroup *) tmp->data; - - GST_DEBUG_OBJECT (dbin, "free old group %p", group); - gst_decode_group_free (group); - } - g_list_free (dbin->oldgroups); - dbin->oldgroups = NULL; - - GST_DEBUG_OBJECT (dbin, "removing last elements"); - - /* remove all remaining elements */ - it = gst_bin_iterate_elements (GST_BIN_CAST (dbin)); -restart: - done = FALSE; - while (!done) { - res = gst_iterator_next (it, &point); - switch (res) { - case GST_ITERATOR_DONE: - done = TRUE; - break; - case GST_ITERATOR_RESYNC: - gst_iterator_resync (it); - goto restart; - case GST_ITERATOR_ERROR: - GST_WARNING_OBJECT (dbin, - "Had an error while iterating bin %s", GST_ELEMENT_NAME (dbin)); - done = TRUE; - break; - case GST_ITERATOR_OK: - { - GstElement *elem = GST_ELEMENT_CAST (point); - - /* don't remove the typefind element */ - if (elem != dbin->typefind) { - GST_DEBUG_OBJECT (dbin, "remove element %s", GST_ELEMENT_NAME (elem)); - gst_bin_remove (GST_BIN_CAST (dbin), elem); - gst_element_set_state (elem, GST_STATE_NULL); - } - gst_object_unref (elem); - break; - } - default: - break; - } - } - gst_iterator_free (it); -} - -static void gst_decode_bin_dispose (GObject * object) { GstDecodeBin *decode_bin; @@ -744,7 +752,9 @@ gst_decode_bin_dispose (GObject * object) g_value_array_free (decode_bin->factories); decode_bin->factories = NULL; - gst_decode_bin_remove_groups (decode_bin); + if (decode_bin->decode_chain) + gst_decode_chain_free (decode_bin->decode_chain); + decode_bin->decode_chain = NULL; if (decode_bin->caps) gst_caps_unref (decode_bin->caps); @@ -766,9 +776,9 @@ gst_decode_bin_finalize (GObject * object) decode_bin = GST_DECODE_BIN (object); - if (decode_bin->lock) { - g_mutex_free (decode_bin->lock); - decode_bin->lock = NULL; + if (decode_bin->expose_lock) { + g_mutex_free (decode_bin->expose_lock); + decode_bin->expose_lock = NULL; } if (decode_bin->dyn_lock) { @@ -790,21 +800,10 @@ gst_decode_bin_finalize (GObject * object) static void gst_decode_bin_set_caps (GstDecodeBin * dbin, GstCaps * caps) { - GstCaps *old; - GST_DEBUG_OBJECT (dbin, "Setting new caps: %" GST_PTR_FORMAT, caps); GST_OBJECT_LOCK (dbin); - old = dbin->caps; - if (old != caps) { - if (caps) - gst_caps_ref (caps); - - dbin->caps = caps; - - if (old) - gst_caps_unref (old); - } + gst_caps_replace (&dbin->caps, caps); GST_OBJECT_UNLOCK (dbin); } @@ -994,26 +993,20 @@ static gboolean is_demuxer_element (GstElement * srcelement); static gboolean connect_pad (GstDecodeBin * dbin, GstElement * src, GstDecodePad * dpad, GstPad * pad, GstCaps * caps, GValueArray * factories, - GstDecodeGroup * group); + GstDecodeChain * chain); static gboolean connect_element (GstDecodeBin * dbin, GstElement * element, - GstDecodeGroup * group); + GstDecodeChain * chain); static void expose_pad (GstDecodeBin * dbin, GstElement * src, - GstDecodePad * dpad, GstPad * pad, GstDecodeGroup * group); + GstDecodePad * dpad, GstPad * pad, GstDecodeChain * chain); -static void pad_added_group_cb (GstElement * element, GstPad * pad, - GstDecodeGroup * group); -static void pad_removed_group_cb (GstElement * element, GstPad * pad, - GstDecodeGroup * group); -static void no_more_pads_group_cb (GstElement * element, - GstDecodeGroup * group); static void pad_added_cb (GstElement * element, GstPad * pad, - GstDecodeBin * dbin); + GstDecodeChain * chain); static void pad_removed_cb (GstElement * element, GstPad * pad, - GstDecodeBin * dbin); -static void no_more_pads_cb (GstElement * element, GstDecodeBin * dbin); + GstDecodeChain * chain); +static void no_more_pads_cb (GstElement * element, GstDecodeChain * chain); -static GstDecodeGroup *get_current_group (GstDecodeBin * dbin, - gboolean create, gboolean demux, gboolean * created); +static GstDecodeGroup *gst_decode_chain_get_current_group (GstDecodeChain * + chain); /* called when a new pad is discovered. It will perform some basic actions * before trying to link something to it. @@ -1029,7 +1022,7 @@ static GstDecodeGroup *get_current_group (GstDecodeBin * dbin, */ static void analyze_new_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, - GstCaps * caps, GstDecodeGroup * group) + GstCaps * caps, GstDecodeChain * chain) { gboolean apcontinue = TRUE; GValueArray *factories = NULL, *result = NULL; @@ -1038,13 +1031,40 @@ analyze_new_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, GST_DEBUG_OBJECT (dbin, "Pad %s:%s caps:%" GST_PTR_FORMAT, GST_DEBUG_PAD_NAME (pad), caps); + if (chain->elements && src != chain->elements->data) { + GST_ERROR_OBJECT (dbin, "New pad from not the last element in this chain"); + return; + } + + if (chain->endpad) { + GST_ERROR_OBJECT (dbin, "New pad in a chain that is already complete"); + return; + } + + if (chain->demuxer) { + GstDecodeGroup *group; + GstDecodeChain *oldchain = chain; + + CHAIN_MUTEX_LOCK (oldchain); + group = gst_decode_chain_get_current_group (chain); + if (group) { + chain = gst_decode_chain_new (dbin, group); + group->children = g_list_prepend (group->children, chain); + } + CHAIN_MUTEX_UNLOCK (oldchain); + if (!group) { + GST_WARNING_OBJECT (dbin, "No current group"); + return; + } + } + if ((caps == NULL) || gst_caps_is_empty (caps)) goto unknown_type; if (gst_caps_is_any (caps)) goto any_caps; - dpad = gst_decode_pad_new (dbin, pad, group); + dpad = gst_decode_pad_new (dbin, pad, chain); /* 1. Emit 'autoplug-continue' the result will tell us if this pads needs * further autoplugging. */ @@ -1088,7 +1108,7 @@ analyze_new_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, /* 1.e else continue autoplugging something from the list. */ GST_LOG_OBJECT (pad, "Let's continue discovery on this pad"); - connect_pad (dbin, src, dpad, pad, caps, factories, group); + connect_pad (dbin, src, dpad, pad, caps, factories, chain); gst_object_unref (dpad); g_value_array_free (factories); @@ -1098,7 +1118,7 @@ analyze_new_pad (GstDecodeBin * dbin, GstElement * src, GstPad * pad, expose_pad: { GST_LOG_OBJECT (dbin, "Pad is final. autoplug-continue:%d", apcontinue); - expose_pad (dbin, src, dpad, pad, group); + expose_pad (dbin, src, dpad, pad, chain); gst_object_unref (dpad); return; } @@ -1108,14 +1128,19 @@ unknown_type: g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_UNKNOWN_TYPE], 0, pad, caps); - /* Check if there are no pending groups, if so, commit our state */ - if (dbin->groups == NULL) { - do_async_done (dbin); - } + chain->deadend = TRUE; gst_element_post_message (GST_ELEMENT_CAST (dbin), gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps)); + /* Try to expose anything */ + EXPOSE_LOCK (dbin); + if (gst_decode_chain_is_complete (dbin->decode_chain)) { + gst_decode_bin_expose (dbin); + } + EXPOSE_UNLOCK (dbin); + do_async_done (dbin); + if (src == dbin->typefind) { gchar *desc; @@ -1150,17 +1175,14 @@ any_caps: setup_caps_delay: { /* connect to caps notification */ - if (group) { - GROUP_MUTEX_LOCK (group); - group->nbdynamic++; - GST_LOG_OBJECT (dbin, "Group %p has now %d dynamic elements", group, - group->nbdynamic); - GROUP_MUTEX_UNLOCK (group); - g_signal_connect (G_OBJECT (pad), "notify::caps", - G_CALLBACK (caps_notify_group_cb), group); - } else - g_signal_connect (G_OBJECT (pad), "notify::caps", - G_CALLBACK (caps_notify_cb), dbin); + CHAIN_MUTEX_LOCK (chain); + GST_LOG_OBJECT (dbin, "Chain %p has now %d dynamic pads", chain, + g_list_length (chain->pending_pads)); + chain->pending_pads = + g_list_prepend (chain->pending_pads, gst_object_ref (pad)); + CHAIN_MUTEX_UNLOCK (chain); + g_signal_connect (G_OBJECT (pad), "notify::caps", + G_CALLBACK (caps_notify_cb), chain); return; } } @@ -1179,28 +1201,28 @@ setup_caps_delay: static gboolean connect_pad (GstDecodeBin * dbin, GstElement * src, GstDecodePad * dpad, GstPad * pad, GstCaps * caps, GValueArray * factories, - GstDecodeGroup * group) + GstDecodeChain * chain) { gboolean res = FALSE; GstPad *mqpad = NULL; + gboolean is_demuxer = chain->parent && !chain->elements; /* First pad after the demuxer */ g_return_val_if_fail (factories != NULL, FALSE); g_return_val_if_fail (factories->n_values > 0, FALSE); - GST_DEBUG_OBJECT (dbin, "pad %s:%s , group:%p", - GST_DEBUG_PAD_NAME (pad), group); + GST_DEBUG_OBJECT (dbin, "pad %s:%s , chain:%p", + GST_DEBUG_PAD_NAME (pad), chain); /* 1. is element demuxer or parser */ - if (is_demuxer_element (src)) { - GST_LOG_OBJECT (src, "is a demuxer, connecting the pad through multiqueue"); - - if (!group) - group = get_current_group (dbin, TRUE, TRUE, NULL); + if (is_demuxer) { + GST_LOG_OBJECT (src, + "is a demuxer, connecting the pad through multiqueue '%s'", + GST_OBJECT_NAME (chain->parent->multiqueue)); gst_ghost_pad_set_target (GST_GHOST_PAD (dpad), NULL); - if (!(mqpad = gst_decode_group_control_demuxer_pad (group, pad))) + if (!(mqpad = gst_decode_group_control_demuxer_pad (chain->parent, pad))) goto beach; - src = group->multiqueue; + src = chain->parent->multiqueue; pad = mqpad; gst_ghost_pad_set_target (GST_GHOST_PAD (dpad), pad); } @@ -1213,6 +1235,11 @@ connect_pad (GstDecodeBin * dbin, GstElement * src, GstDecodePad * dpad, GstPad *sinkpad; gboolean subtitle; + /* Set dpad target to pad again, it might've been unset + * below but we came back here because something failed + */ + gst_ghost_pad_set_target (GST_GHOST_PAD (dpad), pad); + /* take first factory */ factory = g_value_get_object (g_value_array_get_nth (factories, 0)); /* Remove selected factory from the list. */ @@ -1230,7 +1257,7 @@ connect_pad (GstDecodeBin * dbin, GstElement * src, GstDecodePad * dpad, case GST_AUTOPLUG_SELECT_EXPOSE: GST_DEBUG_OBJECT (dbin, "autoplug select requested expose"); /* expose the pad, we don't have the source element */ - expose_pad (dbin, src, dpad, pad, group); + expose_pad (dbin, src, dpad, pad, chain); res = TRUE; goto beach; case GST_AUTOPLUG_SELECT_SKIP: @@ -1293,8 +1320,14 @@ connect_pad (GstDecodeBin * dbin, GstElement * src, GstDecodePad * dpad, gst_object_unref (sinkpad); GST_LOG_OBJECT (dbin, "linked on pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + CHAIN_MUTEX_LOCK (chain); + chain->elements = + g_list_prepend (chain->elements, gst_object_ref (element)); + chain->demuxer = is_demuxer_element (element); + CHAIN_MUTEX_UNLOCK (chain); + /* link this element further */ - connect_element (dbin, element, group); + connect_element (dbin, element, chain); /* try to configure the subtitle encoding property when we can */ if (g_object_class_find_property (G_OBJECT_GET_CLASS (element), @@ -1310,10 +1343,22 @@ connect_pad (GstDecodeBin * dbin, GstElement * src, GstDecodePad * dpad, /* Bring the element to the state of the parent */ if ((gst_element_set_state (element, GST_STATE_PAUSED)) == GST_STATE_CHANGE_FAILURE) { + GstElement *tmp = NULL; + GST_WARNING_OBJECT (dbin, "Couldn't set %s to PAUSED", GST_ELEMENT_NAME (element)); - gst_element_set_state (element, GST_STATE_NULL); - gst_bin_remove (GST_BIN (dbin), element); + + /* Remove all elements in this chain that were just added. No + * other thread could've added elements in the meantime */ + CHAIN_MUTEX_LOCK (chain); + do { + tmp = chain->elements->data; + gst_element_set_state (tmp, GST_STATE_NULL); + gst_bin_remove (GST_BIN (dbin), tmp); + chain->elements = g_list_delete_link (chain->elements, chain->elements); + } while (tmp != element); + CHAIN_MUTEX_UNLOCK (chain); + continue; } if (subtitle) { @@ -1337,15 +1382,15 @@ beach: static gboolean connect_element (GstDecodeBin * dbin, GstElement * element, - GstDecodeGroup * group) + GstDecodeChain * chain) { GList *pads; gboolean res = TRUE; gboolean dynamic = FALSE; GList *to_connect = NULL; - GST_DEBUG_OBJECT (dbin, "Attempting to connect element %s [group:%p] further", - GST_ELEMENT_NAME (element), group); + GST_DEBUG_OBJECT (dbin, "Attempting to connect element %s [chain:%p] further", + GST_ELEMENT_NAME (element), chain); /* 1. Loop over pad templates, grabbing existing pads along the way */ for (pads = GST_ELEMENT_GET_CLASS (element)->padtemplates; pads; @@ -1406,31 +1451,16 @@ connect_element (GstDecodeBin * dbin, GstElement * element, } } - /* 2. if there are more potential pads, connect to relevent signals */ + /* 2. if there are more potential pads, connect to relevant signals */ if (dynamic) { - if (group) { - GST_LOG_OBJECT (dbin, "Adding signals to element %s in group %p", - GST_ELEMENT_NAME (element), group); - GROUP_MUTEX_LOCK (group); - group->nbdynamic++; - GST_LOG_OBJECT (dbin, "Group %p has now %d dynamic elements", group, - group->nbdynamic); - GROUP_MUTEX_UNLOCK (group); - g_signal_connect (G_OBJECT (element), "pad-added", - G_CALLBACK (pad_added_group_cb), group); - g_signal_connect (G_OBJECT (element), "pad-removed", - G_CALLBACK (pad_removed_group_cb), group); - g_signal_connect (G_OBJECT (element), "no-more-pads", - G_CALLBACK (no_more_pads_group_cb), group); - } else { - /* This is a non-grouped element, the handlers are different */ - g_signal_connect (G_OBJECT (element), "pad-added", - G_CALLBACK (pad_added_cb), dbin); - g_signal_connect (G_OBJECT (element), "pad-removed", - G_CALLBACK (pad_removed_cb), dbin); - g_signal_connect (G_OBJECT (element), "no-more-pads", - G_CALLBACK (no_more_pads_cb), dbin); - } + GST_LOG_OBJECT (dbin, "Adding signals to element %s in chain %p", + GST_ELEMENT_NAME (element), chain); + g_signal_connect (G_OBJECT (element), "pad-added", + G_CALLBACK (pad_added_cb), chain); + g_signal_connect (G_OBJECT (element), "pad-removed", + G_CALLBACK (pad_removed_cb), chain); + g_signal_connect (G_OBJECT (element), "no-more-pads", + G_CALLBACK (no_more_pads_cb), chain); } /* 3. for every available pad, connect it */ @@ -1439,7 +1469,7 @@ connect_element (GstDecodeBin * dbin, GstElement * element, GstCaps *caps; caps = gst_pad_get_caps (pad); - analyze_new_pad (dbin, element, pad, caps, group); + analyze_new_pad (dbin, element, pad, caps, chain); if (caps) gst_caps_unref (caps); @@ -1452,43 +1482,42 @@ connect_element (GstDecodeBin * dbin, GstElement * element, /* expose_pad: * - * Expose the given pad on the group as a decoded pad. - * If group is NULL, a GstDecodeGroup will be created and setup properly. + * Expose the given pad on the chain as a decoded pad. */ static void expose_pad (GstDecodeBin * dbin, GstElement * src, GstDecodePad * dpad, - GstPad * pad, GstDecodeGroup * group) + GstPad * pad, GstDecodeChain * chain) { - gboolean newgroup = FALSE; - gboolean isdemux; GstPad *mqpad = NULL; - GST_DEBUG_OBJECT (dbin, "pad %s:%s, group:%p", - GST_DEBUG_PAD_NAME (pad), group); - - isdemux = is_demuxer_element (src); - - if (!group) - group = get_current_group (dbin, TRUE, isdemux, &newgroup); + GST_DEBUG_OBJECT (dbin, "pad %s:%s, chain:%p", + GST_DEBUG_PAD_NAME (pad), chain); - if (isdemux) { + /* If this is the first pad for this chain, there are no other elements + * and the source element is not the multiqueue we must link through the + * multiqueue. + * + * This is the case if a demuxer directly exposed a raw pad. + */ + if (chain->parent && !chain->elements && src != chain->parent->multiqueue) { GST_LOG_OBJECT (src, "connecting the pad through multiqueue"); gst_ghost_pad_set_target (GST_GHOST_PAD (dpad), NULL); - if (!(mqpad = gst_decode_group_control_demuxer_pad (group, pad))) + if (!(mqpad = gst_decode_group_control_demuxer_pad (chain->parent, pad))) goto beach; pad = mqpad; gst_ghost_pad_set_target (GST_GHOST_PAD (dpad), pad); } - gst_decode_group_control_source_pad (group, dpad); + gst_decode_pad_activate (dpad, chain); + chain->endpad = gst_object_ref (dpad); - if (newgroup && !isdemux) { - /* If we have discovered a raw pad and it doesn't belong to any group, - * that means there wasn't any demuxer. In that case, we consider the - * group as being complete. */ - gst_decode_group_set_complete (group); + EXPOSE_LOCK (dbin); + if (gst_decode_chain_is_complete (dbin->decode_chain)) { + gst_decode_bin_expose (dbin); } + EXPOSE_UNLOCK (dbin); + if (mqpad) gst_object_unref (mqpad); @@ -1514,16 +1543,17 @@ type_found (GstElement * typefind, guint probability, goto exit; } - /* we can only deal with one type, we don't yet support dynamically changing + /* FIXME: we can only deal with one type, we don't yet support dynamically changing * caps from the typefind element */ - if (decode_bin->have_type) + if (decode_bin->have_type || decode_bin->decode_chain) goto exit; decode_bin->have_type = TRUE; pad = gst_element_get_static_pad (typefind, "src"); - analyze_new_pad (decode_bin, typefind, pad, caps, NULL); + decode_bin->decode_chain = gst_decode_chain_new (decode_bin, NULL); + analyze_new_pad (decode_bin, typefind, pad, caps, decode_bin->decode_chain); gst_object_unref (pad); @@ -1532,133 +1562,116 @@ exit: } static void -pad_added_group_cb (GstElement * element, GstPad * pad, GstDecodeGroup * group) +pad_added_cb (GstElement * element, GstPad * pad, GstDecodeChain * chain) { GstCaps *caps; - gboolean expose = FALSE; GstDecodeBin *dbin; - dbin = group->dbin; + dbin = chain->dbin; - GST_DEBUG_OBJECT (pad, "pad added, group:%p", group); + GST_DEBUG_OBJECT (pad, "pad added, chain:%p", chain); caps = gst_pad_get_caps (pad); - analyze_new_pad (dbin, element, pad, caps, group); + analyze_new_pad (dbin, element, pad, caps, chain); if (caps) gst_caps_unref (caps); - GROUP_MUTEX_LOCK (group); - if (group->nbdynamic > 0) - group->nbdynamic--; - GST_LOG_OBJECT (dbin, "Group %p has now %d dynamic objects", group, - group->nbdynamic); - if (group->nbdynamic == 0) - expose = TRUE; - GROUP_MUTEX_UNLOCK (group); - - if (expose) { + EXPOSE_LOCK (dbin); + if (gst_decode_chain_is_complete (dbin->decode_chain)) { GST_LOG_OBJECT (dbin, "That was the last dynamic object, now attempting to expose the group"); - DECODE_BIN_LOCK (dbin); - if (!gst_decode_group_expose (group)) + if (!gst_decode_bin_expose (dbin)) GST_WARNING_OBJECT (dbin, "Couldn't expose group"); - DECODE_BIN_UNLOCK (dbin); } + EXPOSE_UNLOCK (dbin); } static void -pad_removed_group_cb (GstElement * element, GstPad * pad, - GstDecodeGroup * group) +pad_removed_cb (GstElement * element, GstPad * pad, GstDecodeChain * chain) { - GST_LOG_OBJECT (pad, "pad removed, group:%p", group); + GList *l; + + GST_LOG_OBJECT (pad, "pad removed, chain:%p", chain); /* In fact, we don't have to do anything here, the active group will be * removed when the group's multiqueue is drained */ + CHAIN_MUTEX_LOCK (chain); + for (l = chain->pending_pads; l; l = l->next) { + GstPad *opad = l->data; + + if (pad == opad) { + g_signal_handlers_disconnect_by_func (pad, caps_notify_cb, chain); + gst_object_unref (pad); + chain->pending_pads = g_list_delete_link (chain->pending_pads, l); + break; + } + } + CHAIN_MUTEX_UNLOCK (chain); } static void -no_more_pads_group_cb (GstElement * element, GstDecodeGroup * group) -{ - GST_LOG_OBJECT (element, "no more pads, setting group %p to complete", group); - - /* when we received no_more_pads, we can complete the pads of the group */ - gst_decode_group_set_complete (group); -} - -static void -pad_added_cb (GstElement * element, GstPad * pad, GstDecodeBin * dbin) -{ - GstCaps *caps; - - GST_LOG_OBJECT (pad, "Pad added to non-grouped element"); - - caps = gst_pad_get_caps (pad); - analyze_new_pad (dbin, element, pad, caps, NULL); - if (caps) - gst_caps_unref (caps); -} - -static void -pad_removed_cb (GstElement * element, GstPad * pad, GstDecodeBin * dbin) -{ - GST_LOG_OBJECT (pad, "Pad removed from non-grouped element"); -} - -static void -no_more_pads_cb (GstElement * element, GstDecodeBin * dbin) +no_more_pads_cb (GstElement * element, GstDecodeChain * chain) { - GstDecodeGroup *group; - - GST_LOG_OBJECT (element, "No more pads, setting current group to complete"); - - /* Find the non-complete group, there should only be one */ - if (!(group = get_current_group (dbin, FALSE, FALSE, NULL))) - goto no_group; - - gst_decode_group_set_complete (group); + GstDecodeGroup *group = NULL; - return; + GST_LOG_OBJECT (element, "got no more pads"); -no_group: - { - GST_DEBUG_OBJECT (dbin, "We couldn't find a non-completed group"); + if (!chain->elements || (GstElement *) chain->elements->data != element) { + GST_LOG_OBJECT (chain->dbin, "no-more-pads from old chain element '%s'", + GST_OBJECT_NAME (element)); return; } -} -static void -caps_notify_cb (GstPad * pad, GParamSpec * unused, GstDecodeBin * dbin) -{ - GstElement *element; - - GST_LOG_OBJECT (dbin, "Notified caps for pad %s:%s", - GST_DEBUG_PAD_NAME (pad)); - - /* Disconnect this; if we still need it, we'll reconnect to this in - * analyze_new_pad */ - g_signal_handlers_disconnect_by_func (pad, caps_notify_cb, dbin); + CHAIN_MUTEX_LOCK (chain); + /* when we received no_more_pads, we can complete the pads of the chain */ + if (!chain->next_groups && chain->active_group) { + group = chain->active_group; + } else if (chain->next_groups) { + group = chain->next_groups->data; + } + if (!group) { + GST_ERROR_OBJECT (chain->dbin, "can't find group for element"); + CHAIN_MUTEX_UNLOCK (chain); + return; + } - element = GST_ELEMENT_CAST (gst_pad_get_parent (pad)); + GST_DEBUG_OBJECT (element, "Setting group %p to complete", group); - pad_added_cb (element, pad, dbin); + group->no_more_pads = TRUE; + CHAIN_MUTEX_UNLOCK (chain); - gst_object_unref (element); + EXPOSE_LOCK (chain->dbin); + if (gst_decode_chain_is_complete (chain->dbin->decode_chain)) { + gst_decode_bin_expose (chain->dbin); + } + EXPOSE_UNLOCK (chain->dbin); } static void -caps_notify_group_cb (GstPad * pad, GParamSpec * unused, GstDecodeGroup * group) +caps_notify_cb (GstPad * pad, GParamSpec * unused, GstDecodeChain * chain) { GstElement *element; + GList *l; GST_LOG_OBJECT (pad, "Notified caps for pad %s:%s", GST_DEBUG_PAD_NAME (pad)); /* Disconnect this; if we still need it, we'll reconnect to this in * analyze_new_pad */ - g_signal_handlers_disconnect_by_func (pad, caps_notify_group_cb, group); + g_signal_handlers_disconnect_by_func (pad, caps_notify_cb, chain); element = GST_ELEMENT_CAST (gst_pad_get_parent (pad)); - pad_added_group_cb (element, pad, group); + CHAIN_MUTEX_LOCK (chain); + for (l = chain->pending_pads; l; l = l->next) { + if (l->data == pad) { + gst_object_unref (GST_OBJECT_CAST (l->data)); + chain->pending_pads = g_list_delete_link (chain->pending_pads, l); + break; + } + } + CHAIN_MUTEX_UNLOCK (chain); + + pad_added_cb (element, pad, chain); gst_object_unref (element); } @@ -1721,25 +1734,204 @@ is_demuxer_element (GstElement * srcelement) static gboolean are_raw_caps (GstDecodeBin * dbin, GstCaps * caps) { - GstCaps *intersection; gboolean res; GST_LOG_OBJECT (dbin, "Checking with caps %" GST_PTR_FORMAT, caps); /* lock for getting the caps */ GST_OBJECT_LOCK (dbin); - intersection = gst_caps_intersect (dbin->caps, caps); + res = gst_caps_can_intersect (dbin->caps, caps); GST_OBJECT_UNLOCK (dbin); - res = (!(gst_caps_is_empty (intersection))); - - gst_caps_unref (intersection); - GST_LOG_OBJECT (dbin, "Caps are %sfinal caps", res ? "" : "not "); return res; } +/**** + * GstDecodeChain functions + ****/ + +/* gst_decode_chain_get_current_group: + * + * Returns the current group of this chain, to which + * new chains should be attached or NULL if the last + * group didn't have no-more-pads. + * + * Not MT-safe: Call with parent chain lock! + */ +static GstDecodeGroup * +gst_decode_chain_get_current_group (GstDecodeChain * chain) +{ + GstDecodeGroup *group; + + if (!chain->next_groups && chain->active_group + && chain->active_group->overrun && !chain->active_group->no_more_pads) { + GST_WARNING_OBJECT (chain->dbin, + "Currently active group %p is exposed" + " and wants to add a new pad without having signaled no-more-pads", + chain->active_group); + return NULL; + } + + if (chain->next_groups && (group = chain->next_groups->data) && group->overrun + && !group->no_more_pads) { + GST_WARNING_OBJECT (chain->dbin, + "Currently newest pending group %p " + "had overflow but didn't signal no-more-pads", group); + return NULL; + } + + /* Now we know that we can really return something useful */ + if (!chain->active_group) { + chain->active_group = group = gst_decode_group_new (chain->dbin, chain); + } else if (!chain->active_group->overrun + && !chain->active_group->no_more_pads) { + group = chain->active_group; + } else if (chain->next_groups && (group = chain->next_groups->data) + && !group->overrun && !group->no_more_pads) { + /* group = chain->next_groups->data */ + } else { + group = gst_decode_group_new (chain->dbin, chain); + chain->next_groups = g_list_prepend (chain->next_groups, group); + } + + return group; +} + +static void gst_decode_group_free_internal (GstDecodeGroup * group, + gboolean hide); + +static void +gst_decode_chain_free_internal (GstDecodeChain * chain, gboolean hide) +{ + GList *l; + + CHAIN_MUTEX_LOCK (chain); + + GST_DEBUG_OBJECT (chain->dbin, "%s chain %p", (hide ? "Hiding" : "Freeing"), + chain); + + if (chain->active_group) { + gst_decode_group_free_internal (chain->active_group, hide); + if (!hide) + chain->active_group = NULL; + } + + for (l = chain->next_groups; l; l = l->next) { + gst_decode_group_free_internal ((GstDecodeGroup *) l->data, hide); + if (!hide) + l->data = NULL; + } + if (!hide) { + g_list_free (chain->next_groups); + chain->next_groups = NULL; + } + + if (!hide) { + for (l = chain->old_groups; l; l = l->next) { + GstDecodeGroup *group = l->data; + + gst_decode_group_free (group); + } + g_list_free (chain->old_groups); + chain->old_groups = NULL; + } + + for (l = chain->pending_pads; l; l = l->next) { + GstPad *pad = GST_PAD (l->data); + + g_signal_handlers_disconnect_by_func (pad, caps_notify_cb, chain); + gst_object_unref (pad); + l->data = NULL; + } + g_list_free (chain->pending_pads); + chain->pending_pads = NULL; + + for (l = chain->elements; l; l = l->next) { + GstElement *element = GST_ELEMENT (l->data); + + g_signal_handlers_disconnect_by_func (element, pad_added_cb, chain); + g_signal_handlers_disconnect_by_func (element, pad_removed_cb, chain); + g_signal_handlers_disconnect_by_func (element, no_more_pads_cb, chain); + + if (GST_OBJECT_PARENT (element) == GST_OBJECT_CAST (chain->dbin)) + gst_bin_remove (GST_BIN_CAST (chain->dbin), element); + if (!hide) { + gst_element_set_state (element, GST_STATE_NULL); + } + + GST_OBJECT_LOCK (chain->dbin); + /* remove possible subtitle element */ + chain->dbin->subtitles = g_list_remove (chain->dbin->subtitles, element); + GST_OBJECT_UNLOCK (chain->dbin); + + if (!hide) { + gst_object_unref (element); + l->data = NULL; + } + } + if (!hide) { + g_list_free (chain->elements); + chain->elements = NULL; + } + + if (chain->endpad) { + if (chain->endpad->exposed) + gst_element_remove_pad (GST_ELEMENT_CAST (chain->dbin), + GST_PAD_CAST (chain->endpad)); + + chain->endpad->exposed = FALSE; + if (!hide) { + gst_object_unref (chain->endpad); + chain->endpad = NULL; + } + } + + GST_DEBUG_OBJECT (chain->dbin, "%s chain %p", (hide ? "Hided" : "Freed"), + chain); + CHAIN_MUTEX_UNLOCK (chain); + if (!hide) { + g_mutex_free (chain->lock); + g_slice_free (GstDecodeChain, chain); + } +} + +/* gst_decode_chain_free: + * + * Completely frees and removes the chain and all + * child groups from decodebin2. + * + * MT-safe, don't hold the chain lock or any child chain's lock + * when calling this! + */ +static void +gst_decode_chain_free (GstDecodeChain * chain) +{ + gst_decode_chain_free_internal (chain, FALSE); +} + +/* gst_decode_chain_new: + * + * Creates a new decode chain and initializes it. + * + * It's up to the caller to add it to the list of child chains of + * a group! + */ +static GstDecodeChain * +gst_decode_chain_new (GstDecodeBin * dbin, GstDecodeGroup * parent) +{ + GstDecodeChain *chain = g_slice_new0 (GstDecodeChain); + + GST_DEBUG_OBJECT (dbin, "Creating new chain %p with parent group %p", chain, + parent); + + chain->dbin = dbin; + chain->parent = parent; + chain->lock = g_mutex_new (); + + return chain; +} /**** * GstDecodeGroup functions @@ -1754,135 +1946,152 @@ static void multi_queue_overrun_cb (GstElement * queue, GstDecodeGroup * group) { GstDecodeBin *dbin; - gboolean expose; dbin = group->dbin; - GST_LOG_OBJECT (dbin, "multiqueue %p is full", queue); + GST_LOG_OBJECT (dbin, "multiqueue '%s' (%p) is full", GST_OBJECT_NAME (queue), + queue); - GROUP_MUTEX_LOCK (group); - if (group->complete) { - /* the group was already complete (had the no_more_pads called), we - * can ignore the overrun signal, the last remaining dynamic element - * will expose the group eventually. */ - GST_LOG_OBJECT (dbin, "group %p was already complete", group); - expose = FALSE; - } else { - /* set number of dynamic element to 0, we don't expect anything anymore - * and we need the groups to be 0 for the expose to work */ - group->nbdynamic = 0; - expose = TRUE; - } - GROUP_MUTEX_UNLOCK (group); + group->overrun = TRUE; + + /* FIXME: We should make sure that everything gets exposed now + * even if child chains are not complete because the will never + * be complete! Ignore any non-complete chains when exposing + * and never expose them later + */ - if (expose) { - DECODE_BIN_LOCK (dbin); - if (!gst_decode_group_expose (group)) + EXPOSE_LOCK (dbin); + if (gst_decode_chain_is_complete (dbin->decode_chain)) { + if (!gst_decode_bin_expose (dbin)) GST_WARNING_OBJECT (dbin, "Couldn't expose group"); - DECODE_BIN_UNLOCK (group->dbin); } + EXPOSE_UNLOCK (group->dbin); } -/* gst_decode_group_new: - * - * Creates a new GstDecodeGroup. It is up to the caller to add it to the list - * of groups. - */ -static GstDecodeGroup * -gst_decode_group_new (GstDecodeBin * dbin, gboolean use_queue) +static void +gst_decode_group_free_internal (GstDecodeGroup * group, gboolean hide) { - GstDecodeGroup *group; - GstElement *mq; + GList *l; - GST_LOG_OBJECT (dbin, "Creating new group"); + GST_DEBUG_OBJECT (group->dbin, "%s group %p", (hide ? "Hiding" : "Freeing"), + group); + for (l = group->children; l; l = l->next) { + GstDecodeChain *chain = (GstDecodeChain *) l->data; - if (use_queue) { - if (!(mq = gst_element_factory_make ("multiqueue", NULL))) { - GST_ERROR_OBJECT (dbin, "Couldn't create multiqueue element"); - return NULL; + gst_decode_chain_free_internal (chain, hide); + if (!hide) + l->data = NULL; + } + if (!hide) { + g_list_free (group->children); + group->children = NULL; + } + + if (!hide) { + for (l = group->reqpads; l; l = l->next) { + GstPad *pad = l->data; + + gst_element_release_request_pad (group->multiqueue, pad); + gst_object_unref (pad); + l->data = NULL; } - } else { - mq = NULL; + g_list_free (group->reqpads); + group->reqpads = NULL; } - group = g_new0 (GstDecodeGroup, 1); - group->lock = g_mutex_new (); - group->dbin = dbin; - group->multiqueue = mq; - group->exposed = FALSE; - group->drained = FALSE; - group->blocked = FALSE; - group->complete = FALSE; - group->endpads = NULL; - group->reqpads = NULL; - - if (mq) { - /* we first configure the multiqueue to buffer an unlimited number of - * buffers up to 5 seconds or, when no timestamps are present, up to 2 MB of - * memory. When this queue overruns, we assume the group is complete and can - * be exposed. */ - g_object_set (G_OBJECT (mq), - "max-size-bytes", (guint) 2 * 1024 * 1024, - "max-size-time", (guint64) 0, "max-size-buffers", (guint) 0, NULL); - /* will expose the group */ - group->overrunsig = g_signal_connect (G_OBJECT (mq), "overrun", - G_CALLBACK (multi_queue_overrun_cb), group); - - gst_bin_add (GST_BIN (dbin), mq); - gst_element_set_state (mq, GST_STATE_PAUSED); - } - - GST_LOG_OBJECT (dbin, "Returning new group %p", group); + if (group->multiqueue) { + if (group->overrunsig) { + g_signal_handler_disconnect (group->multiqueue, group->overrunsig); + group->overrunsig = 0; + } - return group; + if (GST_OBJECT_PARENT (group->multiqueue) == GST_OBJECT_CAST (group->dbin)) + gst_bin_remove (GST_BIN_CAST (group->dbin), group->multiqueue); + if (!hide) { + gst_element_set_state (group->multiqueue, GST_STATE_NULL); + gst_object_unref (group->multiqueue); + group->multiqueue = NULL; + } + } + + GST_DEBUG_OBJECT (group->dbin, "%s group %p", (hide ? "Hided" : "Freed"), + group); + if (!hide) + g_slice_free (GstDecodeGroup, group); } -/* get_current_group: - * @dbin: the decodebin - * @create: create the group when not present - * @as_demux: create the group as a demuxer - * @created: result when the group was created +/* gst_decode_group_free: * - * Returns the current non-completed group. The dynamic refcount of the group is - * increased when dealing with a demuxer. + * Completely frees and removes the decode group and all + * it's children. * - * Returns: %NULL if no groups are available, or all groups are completed. + * Never call this from any streaming thread! + * + * Not MT-safe, call with parent's chain lock! + */ +static void +gst_decode_group_free (GstDecodeGroup * group) +{ + gst_decode_group_free_internal (group, FALSE); +} + +/* gst_decode_group_hide: + * + * Hide the decode group only, this means that + * all child endpads are removed from decodebin2 + * and all signals are unconnected. + * + * No element is set to NULL state and completely + * unreffed here. + * + * Can be called from streaming threads. + * + * Not MT-safe, call with parent's chain lock! + */ +static void +gst_decode_group_hide (GstDecodeGroup * group) +{ + gst_decode_group_free_internal (group, TRUE); +} + +/* gst_decode_group_new: + * @dbin: Parent decodebin + * @parent: Parent chain or %NULL + * + * Creates a new GstDecodeGroup. It is up to the caller to add it to the list + * of groups. */ static GstDecodeGroup * -get_current_group (GstDecodeBin * dbin, gboolean create, gboolean as_demux, - gboolean * created) +gst_decode_group_new (GstDecodeBin * dbin, GstDecodeChain * parent) { - GList *tmp; - GstDecodeGroup *group = NULL; + GstDecodeGroup *group = g_slice_new0 (GstDecodeGroup); + GstElement *mq; - DECODE_BIN_LOCK (dbin); - for (tmp = dbin->groups; tmp; tmp = g_list_next (tmp)) { - GstDecodeGroup *this = (GstDecodeGroup *) tmp->data; + GST_DEBUG_OBJECT (dbin, "Creating new group %p with parent chain %p", group, + parent); - GROUP_MUTEX_LOCK (this); - GST_LOG_OBJECT (dbin, "group %p, complete:%d", this, this->complete); + group->dbin = dbin; + group->parent = parent; - if (!this->complete) { - group = this; - GROUP_MUTEX_UNLOCK (this); - break; - } else { - GROUP_MUTEX_UNLOCK (this); - } - } - if (group == NULL && create) { - group = gst_decode_group_new (dbin, as_demux); - GST_LOG_OBJECT (dbin, "added group %p, demux %d", group, as_demux); - dbin->groups = g_list_prepend (dbin->groups, group); - if (created) - *created = TRUE; - /* demuxers are dynamic, we need no-more-pads or overrun now */ - if (as_demux) - group->nbdynamic++; + mq = group->multiqueue = gst_element_factory_make ("multiqueue", NULL); + if (G_UNLIKELY (!group->multiqueue)) { + gst_element_post_message (GST_ELEMENT_CAST (dbin), + gst_missing_element_message_new (GST_ELEMENT_CAST (dbin), + "multiqueue")); + GST_ELEMENT_ERROR (dbin, CORE, MISSING_PLUGIN, (NULL), ("no multiqueue!")); + g_slice_free (GstDecodeGroup, group); + return NULL; } - DECODE_BIN_UNLOCK (dbin); - GST_LOG_OBJECT (dbin, "Returning group %p", group); + g_object_set (G_OBJECT (mq), + "max-size-bytes", (guint) 2 * 1024 * 1024, + "max-size-time", (guint64) 0, "max-size-buffers", (guint) 0, NULL); + + group->overrunsig = g_signal_connect (G_OBJECT (mq), "overrun", + G_CALLBACK (multi_queue_overrun_cb), group); + + gst_bin_add (GST_BIN (dbin), gst_object_ref (mq)); + gst_element_set_state (mq, GST_STATE_PAUSED); return group; } @@ -1907,6 +2116,9 @@ gst_decode_group_control_demuxer_pad (GstDecodeGroup * group, GstPad * pad) srcpad = NULL; + if (G_UNLIKELY (!group->multiqueue)) + return NULL; + if (!(sinkpad = gst_element_get_request_pad (group->multiqueue, "sink%d"))) { GST_ERROR_OBJECT (dbin, "Couldn't get sinkpad from multiqueue"); return NULL; @@ -1917,15 +2129,14 @@ gst_decode_group_control_demuxer_pad (GstDecodeGroup * group, GstPad * pad) goto beach; } - group->reqpads = g_list_prepend (group->reqpads, sinkpad); + CHAIN_MUTEX_LOCK (group->parent); + group->reqpads = g_list_prepend (group->reqpads, gst_object_ref (sinkpad)); sinkname = gst_pad_get_name (sinkpad); nb = sinkname + 4; srcname = g_strdup_printf ("src%s", nb); g_free (sinkname); - GROUP_MUTEX_LOCK (group); - if (!(srcpad = gst_element_get_static_pad (group->multiqueue, srcname))) { GST_ERROR_OBJECT (dbin, "Couldn't get srcpad %s from multiqueue", srcname); goto chiringuito; @@ -1933,163 +2144,224 @@ gst_decode_group_control_demuxer_pad (GstDecodeGroup * group, GstPad * pad) chiringuito: g_free (srcname); - GROUP_MUTEX_UNLOCK (group); + CHAIN_MUTEX_UNLOCK (group->parent); beach: gst_object_unref (sinkpad); return srcpad; } +/* gst_decode_group_is_complete: + * + * Checks if the group is complete, this means that + * a) overrun of the multiqueue or no-more-pads happened + * b) all child chains are complete + * + * Not MT-safe, always call with decodebin expose lock + */ static gboolean -gst_decode_group_control_source_pad (GstDecodeGroup * group, - GstDecodePad * dpad) +gst_decode_group_is_complete (GstDecodeGroup * group) { - g_return_val_if_fail (group != NULL, FALSE); + GList *l; + gboolean complete = TRUE; - GST_DEBUG_OBJECT (dpad, "adding decode pad to group %p", group); + if (!group->overrun && !group->no_more_pads) { + complete = FALSE; + goto out; + } - /* FIXME : check if pad is already controlled */ - gst_decode_pad_activate (dpad, group); + for (l = group->children; l; l = l->next) { + GstDecodeChain *chain = l->data; - GROUP_MUTEX_LOCK (group); - group->endpads = g_list_prepend (group->endpads, gst_object_ref (dpad)); - GROUP_MUTEX_UNLOCK (group); + if (!gst_decode_chain_is_complete (chain)) { + complete = FALSE; + goto out; + } + } - return TRUE; +out: + GST_DEBUG_OBJECT (group->dbin, "Group %p is complete: %d", group, complete); + return complete; } -/* gst_decode_group_check_if_blocked: +/* gst_decode_chain_is_complete: * - * Call this when one of the pads blocked status has changed. - * If the group is complete and blocked, the group will be marked as blocked - * and will ghost/expose all pads on decodebin if the group is the current one. + * Returns TRUE if the chain is complete, this means either + * a) This chain is a dead end, i.e. we have no suitable plugins + * b) This chain ends in an endpad and this is blocked * - * Call with the group lock taken ! MT safe - * - * Returns: TRUE when the group is completely blocked and ready to be exposed. + * Not MT-safe, always call with decodebin expose lock */ static gboolean -gst_decode_group_check_if_blocked (GstDecodeGroup * group) +gst_decode_chain_is_complete (GstDecodeChain * chain) { - GstDecodeBin *dbin; - GList *tmp; - gboolean blocked = TRUE; - - dbin = group->dbin; - - GST_LOG_OBJECT (dbin, "group : %p , ->complete:%d , ->nbdynamic:%d", - group, group->complete, group->nbdynamic); + gboolean complete = FALSE; - /* don't do anything if group is not complete */ - if (!group->complete || group->nbdynamic) { - GST_DEBUG_OBJECT (group->dbin, "Group isn't complete yet"); - return FALSE; + if (chain->deadend || (chain->endpad && chain->endpad->blocked)) { + complete = TRUE; + goto out; } - for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) { - GstDecodePad *dpad = (GstDecodePad *) tmp->data; - - if (!dpad->blocked) { - blocked = FALSE; - break; + if (chain->demuxer) { + if (chain->active_group + && gst_decode_group_is_complete (chain->active_group)) { + complete = TRUE; + goto out; } } - /* Update status of group */ - group->blocked = blocked; - GST_LOG_OBJECT (dbin, "group is blocked:%d", blocked); +out: + GST_DEBUG_OBJECT (chain->dbin, "Chain %p is complete: %d", chain, complete); + return complete; +} - return blocked; +/* check if the group is drained, meaning all pads have seen an EOS + * event. */ +static void +gst_decode_pad_handle_eos (GstDecodePad * pad) +{ + GstDecodeChain *chain = pad->chain; + + GST_LOG_OBJECT (pad->dbin, "chain : %p, pad %p", chain, pad); + pad->drained = TRUE; + gst_decode_chain_handle_eos (chain); } -/* activate the next group when there is one +/* gst_decode_chain_handle_eos: + * + * Checks if there are next groups in any parent chain + * to which we can switch or if everything is drained. * - * Returns: TRUE when group was the active group and there was a - * next group to activate. + * If there are groups to switch to, hide the current active + * one and expose the new one. + * + * MT-safe, don't call with chain lock! */ -static gboolean -gst_decode_bin_activate_next_group (GstDecodeBin * dbin, GstDecodeGroup * group) +static void +gst_decode_chain_handle_eos (GstDecodeChain * eos_chain) { - gboolean have_next = FALSE; - - DECODE_BIN_LOCK (dbin); - /* Check if there is a next group to activate */ - if ((group == dbin->activegroup) && dbin->groups) { - GstDecodeGroup *newgroup; + GstDecodeBin *dbin = eos_chain->dbin; + GstDecodeGroup *group = eos_chain->parent; + GstDecodeChain *chain = eos_chain; + gboolean drained; - /* get the next group */ - newgroup = (GstDecodeGroup *) dbin->groups->data; + g_return_if_fail (eos_chain->endpad); - GST_DEBUG_OBJECT (dbin, "Switching to new group"); + CHAIN_MUTEX_LOCK (chain); + while ((group = chain->parent)) { + CHAIN_MUTEX_UNLOCK (chain); + chain = group->parent; + CHAIN_MUTEX_LOCK (chain); - /* hide current group */ - gst_decode_group_hide (group); - /* expose next group */ - gst_decode_group_expose (newgroup); - - /* we have a next group */ - have_next = TRUE; + if (gst_decode_group_is_drained (group)) { + continue; + } + break; } - DECODE_BIN_UNLOCK (dbin); - return have_next; + drained = + chain->active_group ? gst_decode_group_is_drained (chain-> + active_group) : TRUE; + + /* Now either group == NULL and chain == dbin->decode_chain + * or chain is the lowest chain that has a non-drained group */ + if (chain->active_group && drained && chain->next_groups) { + GST_DEBUG_OBJECT (dbin, "Hiding current group %p", chain->active_group); + gst_decode_group_hide (chain->active_group); + chain->old_groups = g_list_prepend (chain->old_groups, chain->active_group); + GST_DEBUG_OBJECT (dbin, "Switching to next group %p", + chain->next_groups->data); + chain->active_group = chain->next_groups->data; + chain->next_groups = + g_list_delete_link (chain->next_groups, chain->next_groups); + CHAIN_MUTEX_UNLOCK (chain); + EXPOSE_LOCK (dbin); + if (gst_decode_chain_is_complete (dbin->decode_chain)) + gst_decode_bin_expose (dbin); + EXPOSE_UNLOCK (dbin); + } else if (!chain->active_group || drained) { + g_assert (chain == dbin->decode_chain); + CHAIN_MUTEX_UNLOCK (chain); + + GST_LOG_OBJECT (dbin, "all groups drained, fire signal"); + g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_DRAINED], 0, + NULL); + } else { + CHAIN_MUTEX_UNLOCK (chain); + GST_DEBUG_OBJECT (dbin, + "Current active group in chain %p is not drained yet", chain); + } } -/* check if the group is drained, meaning all pads have seen an EOS - * event. */ -static void -gst_decode_pad_handle_eos (GstDecodePad * pad) +/* gst_decode_group_is_drained: + * + * Check is this group is drained and cache this result. + * The group is drained if all child chains are drained. + * + * Not MT-safe, call with group->parent's lock */ +static gboolean +gst_decode_group_is_drained (GstDecodeGroup * group) { - GList *tmp; - GstDecodeBin *dbin; - GstDecodeGroup *group; + GList *l; gboolean drained = TRUE; - group = pad->group; - dbin = group->dbin; + if (group->drained) { + drained = TRUE; + goto out; + } - GST_LOG_OBJECT (dbin, "group : %p, pad %p", group, pad); + for (l = group->children; l; l = l->next) { + GstDecodeChain *chain = l->data; - GROUP_MUTEX_LOCK (group); - /* mark pad as drained */ - pad->drained = TRUE; - - /* Ensure we only emit the drained signal once, for this group */ - if (group->drained) - goto was_drained; + CHAIN_MUTEX_LOCK (chain); + if (!gst_decode_chain_is_drained (chain)) + drained = FALSE; + CHAIN_MUTEX_UNLOCK (chain); + if (!drained) + goto out; + } + group->drained = drained; - for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) { - GstDecodePad *dpad = (GstDecodePad *) tmp->data; +out: + GST_DEBUG_OBJECT (group->dbin, "Group %p is drained: %d", group, drained); + return drained; +} - GST_LOG_OBJECT (dbin, "testing dpad %p %d", dpad, dpad->drained); +/* gst_decode_chain_is_drained: + * + * Check is the chain is drained, which means that + * either + * + * a) it's endpad is drained + * b) there are no pending pads, the active group is drained + * and there are no next groups + * + * Not MT-safe, call with chain lock + */ +static gboolean +gst_decode_chain_is_drained (GstDecodeChain * chain) +{ + gboolean drained = FALSE; - if (!dpad->drained) { - drained = FALSE; - break; - } + if (chain->endpad) { + drained = chain->endpad->drained; + goto out; } - group->drained = drained; - GROUP_MUTEX_UNLOCK (group); - - if (drained) { - /* the current group is completely drained, try to activate the next - * group. this function returns FALSE if there was no next group activated - * and so we are really drained. */ - if (!gst_decode_bin_activate_next_group (dbin, group)) { - /* no more groups to activate, we're completely drained now */ - GST_LOG_OBJECT (dbin, "all groups drained, fire signal"); - g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_DRAINED], 0, - NULL); - } + + if (chain->pending_pads) { + drained = FALSE; + goto out; } - return; -was_drained: - { - GST_LOG_OBJECT (dbin, "group was already drained"); - GROUP_MUTEX_UNLOCK (group); - return; + if (chain->active_group && gst_decode_group_is_drained (chain->active_group) + && !chain->next_groups) { + drained = TRUE; + goto out; } + +out: + GST_DEBUG_OBJECT (chain->dbin, "Chain %p is drained: %d", chain, drained); + return drained; } /* sort_end_pads: @@ -2100,7 +2372,6 @@ was_drained: * * Return: negative if a<b, 0 if a==b, positive if a>b */ - static gint sort_end_pads (GstDecodePad * da, GstDecodePad * db) { @@ -2146,74 +2417,62 @@ sort_end_pads (GstDecodePad * da, GstDecodePad * db) return va - vb; } -/* gst_decode_group_expose: - * - * Expose this group's pads. - * - * Not MT safe, please take the decodebin lock - */ +/* Must only be called if the toplevel chain is complete and blocked! */ +/* Not MT-safe, call with decodebin expose lock! */ static gboolean -gst_decode_group_expose (GstDecodeGroup * group) +gst_decode_bin_expose (GstDecodeBin * dbin) { - GList *tmp; - GList *next = NULL; - GstDecodeBin *dbin; + GList *tmp, *endpads = NULL; + gboolean already_exposed = TRUE; - dbin = group->dbin; + GST_DEBUG_OBJECT (dbin, "Exposing currently active chains/groups"); - GST_DEBUG_OBJECT (dbin, "going to expose group %p", group); - - if (group->nbdynamic) { - GST_DEBUG_OBJECT (dbin, - "Group %p still has %d dynamic objects, not exposing yet", group, - group->nbdynamic); + /* Get the pads that we're going to expose and mark things as exposed */ + if (!gst_decode_chain_expose (dbin->decode_chain, &endpads)) { + g_list_foreach (endpads, (GFunc) gst_object_unref, NULL); + g_list_free (endpads); + GST_ERROR_OBJECT (dbin, "Broken chain/group tree"); + g_return_val_if_reached (FALSE); return FALSE; } - - if (dbin->activegroup == group) { - GST_DEBUG_OBJECT (dbin, "Group %p is already exposed, all is fine", group); - return TRUE; + if (endpads == NULL) { + GST_WARNING_OBJECT (dbin, "No suitable plugins found"); + GST_ELEMENT_ERROR (dbin, CORE, MISSING_PLUGIN, (NULL), + ("no suitable plugins found")); + return FALSE; } - if (group->multiqueue) { - /* update runtime limits. At runtime, we try to keep the amount of buffers - * in the queues as low as possible (but at least 5 buffers). */ - g_object_set (G_OBJECT (group->multiqueue), - "max-size-bytes", 2 * 1024 * 1024, "max-size-buffers", 5, NULL); - /* we can now disconnect any overrun signal, which is used to expose the - * group. */ - if (group->overrunsig) { - GST_LOG_OBJECT (dbin, "Disconnecting overrun"); - g_signal_handler_disconnect (group->multiqueue, group->overrunsig); - group->overrunsig = 0; - } - } + /* Check if this was called when everything was exposed already */ + for (tmp = endpads; tmp && already_exposed; tmp = tmp->next) { + GstDecodePad *dpad = tmp->data; - if (dbin->activegroup) { - GST_DEBUG_OBJECT (dbin, - "another group %p is already exposed, waiting for EOS", - dbin->activegroup); + already_exposed &= dpad->exposed; + if (!already_exposed) + break; + } + if (already_exposed) { + GST_DEBUG_OBJECT (dbin, "Everything was exposed already!"); + g_list_foreach (endpads, (GFunc) gst_object_unref, NULL); + g_list_free (endpads); return TRUE; } - if (!dbin->groups || (group != (GstDecodeGroup *) dbin->groups->data)) { - GST_WARNING_OBJECT (dbin, "Group %p is not the first group to expose", - group); - return FALSE; - } + /* Set all already exposed pads to blocked */ + for (tmp = endpads; tmp; tmp = tmp->next) { + GstDecodePad *dpad = tmp->data; - GST_LOG_OBJECT (dbin, "Exposing group %p", group); + if (dpad->exposed) + gst_decode_pad_set_blocked (dpad, TRUE); + } /* re-order pads : video, then audio, then others */ - group->endpads = g_list_sort (group->endpads, (GCompareFunc) sort_end_pads); + endpads = g_list_sort (endpads, (GCompareFunc) sort_end_pads); /* Expose pads */ - for (tmp = group->endpads; tmp; tmp = next) { + for (tmp = endpads; tmp; tmp = tmp->next) { GstDecodePad *dpad = (GstDecodePad *) tmp->data; gchar *padname; - next = g_list_next (tmp); - /* 1. rewrite name */ padname = g_strdup_printf ("src%d", dbin->nbpads); dbin->nbpads++; @@ -2223,18 +2482,19 @@ gst_decode_group_expose (GstDecodeGroup * group) g_free (padname); /* 2. activate and add */ - if (!gst_element_add_pad (GST_ELEMENT (dbin), GST_PAD (dpad))) { + if (!dpad->exposed + && !gst_element_add_pad (GST_ELEMENT (dbin), GST_PAD (dpad))) { /* not really fatal, we can try to add the other pads */ g_warning ("error adding pad to decodebin2"); continue; } - dpad->added = TRUE; + dpad->exposed = TRUE; /* 3. emit signal */ GST_DEBUG_OBJECT (dbin, "emitting new-decoded-pad"); g_signal_emit (G_OBJECT (dbin), gst_decode_bin_signals[SIGNAL_NEW_DECODED_PAD], 0, dpad, - (next == NULL)); + (tmp->next == NULL)); GST_DEBUG_OBJECT (dbin, "emitted new-decoded-pad"); } @@ -2245,223 +2505,73 @@ gst_decode_group_expose (GstDecodeGroup * group) /* 4. Unblock internal pads. The application should have connected stuff now * so that streaming can continue. */ - for (tmp = group->endpads; tmp; tmp = next) { + for (tmp = endpads; tmp; tmp = tmp->next) { GstDecodePad *dpad = (GstDecodePad *) tmp->data; - next = g_list_next (tmp); - GST_DEBUG_OBJECT (dpad, "unblocking"); gst_decode_pad_unblock (dpad); GST_DEBUG_OBJECT (dpad, "unblocked"); + gst_object_unref (dpad); } - - dbin->activegroup = group; - - /* pop off the first group */ - if (dbin->groups && dbin->groups->data) { - GST_LOG_OBJECT (dbin, "removed group %p", dbin->groups->data); - dbin->groups = g_list_delete_link (dbin->groups, dbin->groups); - } else { - GST_LOG_OBJECT (dbin, "no more groups"); - } + g_list_free (endpads); do_async_done (dbin); - - group->exposed = TRUE; - - GST_LOG_OBJECT (dbin, "Group %p exposed", group); + GST_DEBUG_OBJECT (dbin, "Exposed everything"); return TRUE; } -/* must be called with the decodebin lock */ -static void -gst_decode_group_hide (GstDecodeGroup * group) -{ - GList *tmp; - GstDecodeBin *dbin; - - dbin = group->dbin; - - GST_LOG_OBJECT (dbin, "Hiding group %p", group); - - if (group != dbin->activegroup) { - GST_WARNING_OBJECT (dbin, "This group is not the active one, ignoring"); - return; - } - - GROUP_MUTEX_LOCK (group); - /* Remove ghost pads */ - for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) { - GstDecodePad *dpad = (GstDecodePad *) tmp->data; - - if (dpad->added) - gst_element_remove_pad (GST_ELEMENT (group->dbin), GST_PAD (dpad)); - dpad->added = FALSE; - } - group->exposed = FALSE; - GROUP_MUTEX_UNLOCK (group); - - group->dbin->activegroup = NULL; - group->dbin->oldgroups = g_list_prepend (group->dbin->oldgroups, group); -} - -static void -deactivate_free_recursive (GstDecodeGroup * group, GstElement * element) +/* gst_decode_chain_expose: + * + * Check if the chain can be exposed and add all endpads + * to the endpads list. + * + * Also update the active group's multiqueue to the + * runtime limits. + * + * Not MT-safe, call with decodebin expose lock! * + */ +static gboolean +gst_decode_chain_expose (GstDecodeChain * chain, GList ** endpads) { - GstIterator *it; - GstIteratorResult res; - gpointer point; - GstDecodeBin *dbin; - - dbin = group->dbin; - - GST_LOG_OBJECT (dbin, "element:%s", GST_ELEMENT_NAME (element)); - - /* call on downstream elements */ - it = gst_element_iterate_src_pads (element); - -restart: - - while (1) { - res = gst_iterator_next (it, &point); - switch (res) { - case GST_ITERATOR_DONE: - goto done; - case GST_ITERATOR_RESYNC: - gst_iterator_resync (it); - goto restart; - case GST_ITERATOR_ERROR: - { - GST_WARNING_OBJECT (dbin, - "Had an error while iterating source pads of element: %s", - GST_ELEMENT_NAME (element)); - goto beach; - } - case GST_ITERATOR_OK: - { - GstPad *pad = GST_PAD (point); - GstPad *peerpad = NULL; - - if ((peerpad = gst_pad_get_peer (pad))) { - GstObject *parent; + GstDecodeGroup *group; + GList *l; - parent = gst_pad_get_parent (peerpad); - gst_object_unref (peerpad); + if (chain->deadend) + return TRUE; - if (parent && GST_IS_ELEMENT (parent)) - deactivate_free_recursive (group, GST_ELEMENT (parent)); - if (parent) - gst_object_unref (parent); - } - } - break; - default: - break; - } + if (chain->endpad) { + if (!chain->endpad->blocked) + return FALSE; + *endpads = g_list_prepend (*endpads, gst_object_ref (chain->endpad)); + return TRUE; } -done: - gst_element_set_state (element, GST_STATE_NULL); - GST_OBJECT_LOCK (dbin); - /* remove possible subtitle element */ - dbin->subtitles = g_list_remove (dbin->subtitles, element); - GST_OBJECT_UNLOCK (dbin); - gst_bin_remove (GST_BIN (dbin), element); - -beach: - gst_iterator_free (it); - - return; -} - -static void -gst_decode_group_free (GstDecodeGroup * group) -{ - GstDecodeBin *dbin; - GList *tmp; - - dbin = group->dbin; - - GST_LOG_OBJECT (dbin, "group %p", group); - - GROUP_MUTEX_LOCK (group); - - /* remove exposed pads */ - if (group == dbin->activegroup) { - for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) { - GstDecodePad *dpad = (GstDecodePad *) tmp->data; + group = chain->active_group; + if (!group) + return FALSE; + if (!group->no_more_pads && !group->overrun) + return FALSE; - if (dpad->added) - gst_element_remove_pad (GST_ELEMENT (dbin), GST_PAD (dpad)); - dpad->added = FALSE; - } + /* update runtime limits. At runtime, we try to keep the amount of buffers + * in the queues as low as possible (but at least 5 buffers). */ + g_object_set (G_OBJECT (group->multiqueue), + "max-size-bytes", 2 * 1024 * 1024, "max-size-buffers", 5, NULL); + /* we can now disconnect any overrun signal, which is used to expose the + * group. */ + if (group->overrunsig) { + GST_LOG_OBJECT (group->dbin, "Disconnecting overrun"); + g_signal_handler_disconnect (group->multiqueue, group->overrunsig); + group->overrunsig = 0; } - /* Clear all GstDecodePad */ - for (tmp = group->endpads; tmp; tmp = g_list_next (tmp)) - gst_object_unref (tmp->data); - g_list_free (group->endpads); - group->endpads = NULL; + for (l = group->children; l; l = l->next) { + GstDecodeChain *childchain = l->data; - /* release request pads */ - for (tmp = group->reqpads; tmp; tmp = g_list_next (tmp)) { - gst_element_release_request_pad (group->multiqueue, GST_PAD (tmp->data)); + if (!gst_decode_chain_expose (childchain, endpads)) + return FALSE; } - g_list_free (group->reqpads); - group->reqpads = NULL; - /* disconnect signal handlers on multiqueue */ - if (group->multiqueue) { - if (group->overrunsig) - g_signal_handler_disconnect (group->multiqueue, group->overrunsig); - deactivate_free_recursive (group, group->multiqueue); - } - - /* remove all elements */ - - GROUP_MUTEX_UNLOCK (group); - - g_mutex_free (group->lock); - g_free (group); -} - -/* gst_decode_group_set_complete: - * - * Mark the group as complete. This means no more streams will be controlled - * through this group. This method is usually called when we got no_more_pads or - * when we added the last pad not from a demuxer. - * - * When this method is called, it is possible that some dynamic plugging is - * going on in streaming threads. We decrement the dynamic counter and when it - * reaches zero, we check if all of our pads are blocked before we finally - * expose the group. - * - * MT safe - */ -static void -gst_decode_group_set_complete (GstDecodeGroup * group) -{ - gboolean expose = FALSE; - GstDecodeBin *dbin; - - dbin = group->dbin; - - GST_LOG_OBJECT (dbin, "Setting group %p to COMPLETE", group); - - GROUP_MUTEX_LOCK (group); - group->complete = TRUE; - if (group->nbdynamic > 0) - group->nbdynamic--; - expose = gst_decode_group_check_if_blocked (group); - GROUP_MUTEX_UNLOCK (group); - - /* don't do anything if not blocked completely */ - if (expose) { - DECODE_BIN_LOCK (dbin); - if (!gst_decode_group_expose (group)) - GST_WARNING_OBJECT (dbin, "Couldn't expose group"); - DECODE_BIN_UNLOCK (dbin); - } + return TRUE; } /************************* @@ -2476,37 +2586,34 @@ gst_decode_pad_class_init (GstDecodePadClass * klass) static void gst_decode_pad_init (GstDecodePad * pad) { - pad->group = NULL; + pad->chain = NULL; pad->blocked = FALSE; + pad->exposed = FALSE; pad->drained = FALSE; gst_object_ref (pad); gst_object_sink (pad); } static void -source_pad_blocked_cb (GstPad * opad, gboolean blocked, GstDecodePad * dpad) +source_pad_blocked_cb (GstPad * pad, gboolean blocked, GstDecodePad * dpad) { - GstDecodeGroup *group; + GstDecodeChain *chain; GstDecodeBin *dbin; - gboolean expose = FALSE; - group = dpad->group; - dbin = group->dbin; + chain = dpad->chain; + dbin = chain->dbin; - GST_LOG_OBJECT (dpad, "blocked:%d, dpad->group:%p", blocked, group); + GST_LOG_OBJECT (dpad, "blocked:%d, dpad->chain:%p", blocked, chain); - GROUP_MUTEX_LOCK (group); - /* Update this GstDecodePad status */ dpad->blocked = blocked; - if (blocked) - expose = gst_decode_group_check_if_blocked (group); - GROUP_MUTEX_UNLOCK (group); - if (expose) { - DECODE_BIN_LOCK (dbin); - if (!gst_decode_group_expose (group)) - GST_WARNING_OBJECT (dbin, "Couldn't expose group"); - DECODE_BIN_UNLOCK (dbin); + if (dpad->blocked) { + EXPOSE_LOCK (dbin); + if (gst_decode_chain_is_complete (dbin->decode_chain)) { + if (!gst_decode_bin_expose (dbin)) + GST_WARNING_OBJECT (dbin, "Couldn't expose group"); + } + EXPOSE_UNLOCK (dbin); } } @@ -2533,7 +2640,7 @@ gst_decode_pad_set_blocked (GstDecodePad * dpad, gboolean blocked) GstDecodeBin *dbin = dpad->dbin; GstPad *opad; - DECODE_BIN_DYN_LOCK (dbin); + DYN_LOCK (dbin); opad = gst_ghost_pad_get_target (GST_GHOST_PAD_CAST (dpad)); if (!opad) goto out; @@ -2550,13 +2657,16 @@ gst_decode_pad_set_blocked (GstDecodePad * dpad, gboolean blocked) dbin->blocked_pads = g_list_prepend (dbin->blocked_pads, dpad); } } else { - if (g_list_find (dbin->blocked_pads, dpad)) + GList *l; + + if ((l = g_list_find (dbin->blocked_pads, dpad))) { gst_object_unref (dpad); - dbin->blocked_pads = g_list_remove (dbin->blocked_pads, dpad); + dbin->blocked_pads = g_list_delete_link (dbin->blocked_pads, l); + } } gst_object_unref (opad); out: - DECODE_BIN_DYN_UNLOCK (dbin); + DYN_UNLOCK (dbin); } static void @@ -2567,11 +2677,11 @@ gst_decode_pad_add_drained_check (GstDecodePad * dpad) } static void -gst_decode_pad_activate (GstDecodePad * dpad, GstDecodeGroup * group) +gst_decode_pad_activate (GstDecodePad * dpad, GstDecodeChain * chain) { - g_return_if_fail (group != NULL); + g_return_if_fail (chain != NULL); - dpad->group = group; + dpad->chain = chain; gst_pad_set_active (GST_PAD (dpad), TRUE); gst_decode_pad_set_blocked (dpad, TRUE); gst_decode_pad_add_drained_check (dpad); @@ -2588,7 +2698,7 @@ gst_decode_pad_unblock (GstDecodePad * dpad) * Creates a new GstDecodePad for the given pad. */ static GstDecodePad * -gst_decode_pad_new (GstDecodeBin * dbin, GstPad * pad, GstDecodeGroup * group) +gst_decode_pad_new (GstDecodeBin * dbin, GstPad * pad, GstDecodeChain * chain) { GstDecodePad *dpad; @@ -2597,7 +2707,7 @@ gst_decode_pad_new (GstDecodeBin * dbin, GstPad * pad, GstDecodeGroup * group) NULL); gst_ghost_pad_construct (GST_GHOST_PAD (dpad)); gst_ghost_pad_set_target (GST_GHOST_PAD (dpad), pad); - dpad->group = group; + dpad->chain = chain; dpad->dbin = dbin; return dpad; @@ -2663,13 +2773,12 @@ find_sink_pad (GstElement * element) static void unblock_pads (GstDecodeBin * dbin) { - GList *tmp, *next; + GList *tmp; - for (tmp = dbin->blocked_pads; tmp; tmp = next) { + for (tmp = dbin->blocked_pads; tmp; tmp = tmp->next) { GstDecodePad *dpad = (GstDecodePad *) tmp->data; GstPad *opad = gst_ghost_pad_get_target (GST_GHOST_PAD_CAST (dpad)); - next = g_list_next (tmp); if (!opad) continue; @@ -2701,20 +2810,20 @@ gst_decode_bin_change_state (GstElement * element, GstStateChange transition) goto missing_typefind; break; case GST_STATE_CHANGE_READY_TO_PAUSED: - DECODE_BIN_DYN_LOCK (dbin); + DYN_LOCK (dbin); GST_LOG_OBJECT (dbin, "clearing shutdown flag"); dbin->shutdown = FALSE; - DECODE_BIN_DYN_UNLOCK (dbin); + DYN_UNLOCK (dbin); dbin->have_type = FALSE; ret = GST_STATE_CHANGE_ASYNC; do_async_start (dbin); break; case GST_STATE_CHANGE_PAUSED_TO_READY: - DECODE_BIN_DYN_LOCK (dbin); + DYN_LOCK (dbin); GST_LOG_OBJECT (dbin, "setting shutdown flag"); dbin->shutdown = TRUE; unblock_pads (dbin); - DECODE_BIN_DYN_UNLOCK (dbin); + DYN_UNLOCK (dbin); default: break; } @@ -2733,11 +2842,12 @@ gst_decode_bin_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: do_async_done (dbin); - gst_decode_bin_remove_groups (dbin); + if (dbin->decode_chain) { + gst_decode_chain_free (dbin->decode_chain); + dbin->decode_chain = NULL; + } break; case GST_STATE_CHANGE_READY_TO_NULL: - gst_decode_bin_remove_groups (dbin); - break; default: break; } |