summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian.droege@collabora.co.uk>2012-01-27 16:18:00 +0100
committerSebastian Dröge <sebastian.droege@collabora.co.uk>2012-01-27 16:19:33 +0100
commitbf0964b63a1283eb38a6d803e7a54b07b9e9a7e5 (patch)
tree191030294b8fbfe29479ec51d4c124aca690ade7
parentfe9e2844639fa559a90131ad629eed0ace917f30 (diff)
typefind: Do typefinding from a separate thread and not from the state change function
-rw-r--r--plugins/elements/gsttypefindelement.c375
-rw-r--r--plugins/elements/gsttypefindelement.h5
2 files changed, 263 insertions, 117 deletions
diff --git a/plugins/elements/gsttypefindelement.c b/plugins/elements/gsttypefindelement.c
index 1f7e03a955..7134efb0ac 100644
--- a/plugins/elements/gsttypefindelement.c
+++ b/plugins/elements/gsttypefindelement.c
@@ -157,8 +157,10 @@ static GstFlowReturn gst_type_find_element_getrange (GstPad * srcpad,
static GstStateChangeReturn
gst_type_find_element_change_state (GstElement * element,
GstStateChange transition);
-static gboolean gst_type_find_element_activate (GstPad * pad,
+static gboolean gst_type_find_element_activate_sink (GstPad * pad,
GstObject * parent);
+static gboolean gst_type_find_element_activate_sink_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
static gboolean gst_type_find_element_activate_src_mode (GstPad * pad,
GstObject * parent, GstPadMode mode, gboolean active);
static GstFlowReturn
@@ -167,6 +169,8 @@ gst_type_find_element_chain_do_typefinding (GstTypeFindElement * typefind,
static void gst_type_find_element_send_cached_events (GstTypeFindElement *
typefind);
+static void gst_type_find_element_loop (GstPad * pad);
+
static guint gst_type_find_element_signals[LAST_SIGNAL] = { 0 };
static void
@@ -260,7 +264,9 @@ gst_type_find_element_init (GstTypeFindElement * typefind)
"sink");
gst_pad_set_activate_function (typefind->sink,
- GST_DEBUG_FUNCPTR (gst_type_find_element_activate));
+ GST_DEBUG_FUNCPTR (gst_type_find_element_activate_sink));
+ gst_pad_set_activatemode_function (typefind->sink,
+ GST_DEBUG_FUNCPTR (gst_type_find_element_activate_sink_mode));
gst_pad_set_chain_function (typefind->sink,
GST_DEBUG_FUNCPTR (gst_type_find_element_chain));
gst_pad_set_event_function (typefind->sink,
@@ -410,21 +416,85 @@ out:
return res;
}
-#if 0
-static const GstEventMask *
-gst_type_find_element_src_event_mask (GstPad * pad)
+static gboolean
+gst_type_find_element_seek (GstTypeFindElement * typefind, GstEvent * event)
{
- static const GstEventMask mask[] = {
- {GST_EVENT_SEEK,
- GST_SEEK_METHOD_SET | GST_SEEK_METHOD_CUR | GST_SEEK_METHOD_END |
- GST_SEEK_FLAG_FLUSH},
- /* add more if you want, event masks suck and need to die anyway */
- {0,}
- };
-
- return mask;
+ GstSeekFlags flags;
+ GstSeekType cur_type, stop_type;
+ GstFormat format;
+ gboolean flush;
+ gdouble rate;
+ gint64 cur, stop;
+ GstSegment seeksegment = { 0, };
+
+ gst_event_parse_seek (event, &rate, &format, &flags, &cur_type, &cur,
+ &stop_type, &stop);
+
+ /* we can only seek on bytes */
+ if (format != GST_FORMAT_BYTES) {
+ GST_DEBUG_OBJECT (typefind, "Can only seek on BYTES");
+ return FALSE;
+ }
+
+ /* copy segment, we need this because we still need the old
+ * segment when we close the current segment. */
+ memcpy (&seeksegment, &typefind->segment, sizeof (GstSegment));
+
+ GST_DEBUG_OBJECT (typefind, "configuring seek");
+ gst_segment_do_seek (&seeksegment, rate, format, flags,
+ cur_type, cur, stop_type, stop, NULL);
+
+ flush = ! !(flags & GST_SEEK_FLAG_FLUSH);
+
+ GST_DEBUG_OBJECT (typefind, "New segment %" GST_SEGMENT_FORMAT, &seeksegment);
+
+ if (flush) {
+ GST_DEBUG_OBJECT (typefind, "Starting flush");
+ gst_pad_push_event (typefind->sink, gst_event_new_flush_start ());
+ gst_pad_push_event (typefind->src, gst_event_new_flush_start ());
+ } else {
+ GST_DEBUG_OBJECT (typefind, "Non-flushing seek, pausing task");
+ gst_pad_pause_task (typefind->sink);
+ }
+
+ /* now grab the stream lock so that streaming cannot continue, for
+ * non flushing seeks when the element is in PAUSED this could block
+ * forever. */
+ GST_DEBUG_OBJECT (typefind, "Waiting for streaming to stop");
+ GST_PAD_STREAM_LOCK (typefind->sink);
+
+ if (flush) {
+ GST_DEBUG_OBJECT (typefind, "Stopping flush");
+ gst_pad_push_event (typefind->sink, gst_event_new_flush_stop (TRUE));
+ gst_pad_push_event (typefind->src, gst_event_new_flush_stop (TRUE));
+ }
+
+ /* now update the real segment info */
+ GST_DEBUG_OBJECT (typefind, "Committing new seek segment");
+ memcpy (&typefind->segment, &seeksegment, sizeof (GstSegment));
+ typefind->offset = typefind->segment.start;
+
+ /* notify start of new segment */
+ if (typefind->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+ GstMessage *msg;
+
+ msg = gst_message_new_segment_start (GST_OBJECT (typefind),
+ GST_FORMAT_BYTES, typefind->segment.start);
+ gst_element_post_message (GST_ELEMENT (typefind), msg);
+ }
+
+ typefind->need_segment = TRUE;
+
+ /* restart our task since it might have been stopped when we did the
+ * flush. */
+ gst_pad_start_task (typefind->sink,
+ (GstTaskFunction) gst_type_find_element_loop, typefind->sink);
+
+ /* streaming can continue now */
+ GST_PAD_STREAM_UNLOCK (typefind->sink);
+
+ return TRUE;
}
-#endif
static gboolean
gst_type_find_element_src_event (GstPad * pad, GstObject * parent,
@@ -437,7 +507,14 @@ gst_type_find_element_src_event (GstPad * pad, GstObject * parent,
gst_mini_object_unref (GST_MINI_OBJECT_CAST (event));
return FALSE;
}
- return gst_pad_push_event (typefind->sink, event);
+
+ /* Only handle seeks here if driving the pipeline */
+ if (typefind->segment.format != GST_FORMAT_UNDEFINED &&
+ GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
+ return gst_type_find_element_seek (typefind, event);
+ } else {
+ return gst_pad_push_event (typefind->sink, event);
+ }
}
static void
@@ -899,61 +976,20 @@ gst_type_find_element_activate_src_mode (GstPad * pad, GstObject * parent,
return res;
}
-static gboolean
-gst_type_find_element_activate (GstPad * pad, GstObject * parent)
+static void
+gst_type_find_element_loop (GstPad * pad)
{
- GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
- GstCaps *found_caps = NULL;
GstTypeFindElement *typefind;
- GstQuery *query;
- gboolean pull_mode;
+ GstFlowReturn ret = GST_FLOW_OK;
- typefind = GST_TYPE_FIND_ELEMENT (parent);
+ typefind = GST_TYPE_FIND_ELEMENT (GST_PAD_PARENT (pad));
- /* if we have force caps, use those */
- GST_OBJECT_LOCK (typefind);
- if (typefind->force_caps) {
- found_caps = gst_caps_ref (typefind->force_caps);
- probability = GST_TYPE_FIND_MAXIMUM;
- GST_OBJECT_UNLOCK (typefind);
- goto done;
- }
- GST_OBJECT_UNLOCK (typefind);
-
- /* 1. try to activate in pull mode. if not, switch to push and succeed.
- 2. try to pull type find.
- 3. deactivate pull mode.
- 4. src pad might have been activated push by the state change. deactivate.
- 5. if we didn't find any caps, try getting the uri extension by doing an uri
- query.
- 6. if we didn't find any caps, fail.
- 7. emit have-type; maybe the app connected the source pad to something.
- 8. if the sink pad is activated, we are in pull mode. succeed.
- otherwise activate both pads in push mode and succeed.
- */
-
- /* 1 */
- query = gst_query_new_scheduling ();
-
- if (!gst_pad_peer_query (pad, query)) {
- gst_query_unref (query);
- goto typefind_push;
- }
-
- pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
- gst_query_unref (query);
-
- if (!pull_mode)
- goto typefind_push;
-
- if (!gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE))
- goto typefind_push;
-
- /* 2 */
- GST_DEBUG_OBJECT (typefind, "find type in pull mode");
-
- {
+ if (typefind->mode == MODE_TYPEFIND) {
GstPad *peer;
+ GstCaps *found_caps = NULL;
+ GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
+
+ GST_DEBUG_OBJECT (typefind, "find type in pull mode");
peer = gst_pad_get_peer (pad);
if (peer) {
@@ -963,8 +999,9 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
if (!gst_pad_query_duration (peer, GST_FORMAT_BYTES, &size)) {
GST_WARNING_OBJECT (typefind, "Could not query upstream length!");
gst_object_unref (peer);
- gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
- return FALSE;
+
+ ret = GST_FLOW_ERROR;
+ goto pause;
}
/* the size if 0, we cannot continue */
@@ -973,8 +1010,8 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND,
(_("Stream contains no data.")), ("Can't typefind empty stream"));
gst_object_unref (peer);
- gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
- return FALSE;
+ ret = GST_FLOW_ERROR;
+ goto pause;
}
ext = gst_type_find_get_extension (typefind, pad);
@@ -989,68 +1026,172 @@ gst_type_find_element_activate (GstPad * pad, GstObject * parent)
gst_object_unref (peer);
}
- }
- /* the type find helpers might have triggered setcaps here (due to upstream)
- * setting caps on buffers, which emits typefound signal and an element
- * could have been linked and have its pads activated
- *
- * If we deactivate the pads in the following steps we might mess up
- * downstream element. We should prevent that.
- */
- if (typefind->mode == MODE_NORMAL) {
- /* this means we already emitted typefound */
- GST_DEBUG ("Already managed to typefind !");
- goto really_done;
+ if (!found_caps || probability < typefind->min_probability) {
+ GST_DEBUG ("Trying to guess using extension");
+ found_caps =
+ gst_type_find_guess_by_extension (typefind, pad, &probability);
+ }
+
+ if (!found_caps || probability < typefind->min_probability) {
+ GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
+ gst_caps_replace (&found_caps, NULL);
+ ret = GST_FLOW_ERROR;
+ goto pause;
+ }
+
+ GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
+ g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
+ 0, probability, found_caps);
+ typefind->mode = MODE_NORMAL;
+ } else if (typefind->mode == MODE_NORMAL) {
+ GstBuffer *outbuf;
+
+ if (typefind->need_segment) {
+ typefind->need_segment = FALSE;
+ gst_pad_push_event (typefind->src,
+ gst_event_new_segment (&typefind->segment));
+ }
+
+ /* Pull 4k blocks and send downstream */
+ ret = gst_pad_pull_range (typefind->sink, typefind->offset, 4096, &outbuf);
+ if (ret != GST_FLOW_OK)
+ goto pause;
+
+ typefind->offset += 4096;
+
+ ret = gst_pad_push (typefind->src, outbuf);
+ if (ret != GST_FLOW_OK)
+ goto pause;
+ } else {
+ /* Error out */
+ ret = GST_FLOW_ERROR;
+ goto pause;
}
- /* 3 */
- GST_DEBUG ("Deactivate pull mode");
- gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, FALSE);
+ return;
-#if 0
- /* 4 */
- GST_DEBUG ("Deactivate push mode mode");
- gst_pad_activate_mode (typefind->src, GST_PAD_MODE_PUSH, FALSE);
-#endif
+pause:
+ {
+ const gchar *reason = gst_flow_get_name (ret);
+ gboolean push_eos = FALSE;
+
+ GST_LOG_OBJECT (typefind, "pausing task, reason %s", reason);
+ gst_pad_pause_task (typefind->sink);
+
+ if (ret == GST_FLOW_EOS) {
+ /* perform EOS logic */
+
+ if (typefind->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+ gint64 stop;
- /* 5 */
- if (!found_caps || probability < typefind->min_probability) {
- GST_DEBUG ("Trying to guess using extension");
- found_caps = gst_type_find_guess_by_extension (typefind, pad, &probability);
+ /* for segment playback we need to post when (in stream time)
+ * we stopped, this is either stop (when set) or the duration. */
+ if ((stop = typefind->segment.stop) == -1)
+ stop = typefind->offset;
+
+ GST_LOG_OBJECT (typefind, "Sending segment done, at end of segment");
+ gst_element_post_message (GST_ELEMENT (typefind),
+ gst_message_new_segment_done (GST_OBJECT (typefind),
+ GST_FORMAT_BYTES, stop));
+ } else {
+ push_eos = TRUE;
+ }
+ } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS) {
+ /* for fatal errors we post an error message */
+ GST_ELEMENT_ERROR (typefind, STREAM, FAILED, (NULL),
+ ("stream stopped, reason %s", reason));
+ push_eos = TRUE;
+ }
+ if (push_eos) {
+ /* send EOS, and prevent hanging if no streams yet */
+ GST_LOG_OBJECT (typefind, "Sending EOS, at end of stream");
+ gst_pad_push_event (typefind->src, gst_event_new_eos ());
+ }
+ return;
}
+}
- /* 6 */
- if (!found_caps || probability < typefind->min_probability) {
- GST_ELEMENT_ERROR (typefind, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
- gst_caps_replace (&found_caps, NULL);
- return FALSE;
+static gboolean
+gst_type_find_element_activate_sink_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
+{
+ GstTypeFindElement *typefind;
+
+ typefind = GST_TYPE_FIND_ELEMENT (parent);
+
+ switch (mode) {
+ case GST_PAD_MODE_PULL:
+ if (active) {
+ gst_segment_init (&typefind->segment, GST_FORMAT_BYTES);
+ typefind->need_segment = TRUE;
+ typefind->offset = 0;
+ gst_pad_start_task (pad, (GstTaskFunction) gst_type_find_element_loop,
+ pad);
+ } else {
+ gst_pad_stop_task (pad);
+ }
+ return TRUE;
+ break;
+ case GST_PAD_MODE_PUSH:
+ if (active)
+ start_typefinding (typefind);
+ else
+ stop_typefinding (typefind);
+
+ return TRUE;
+ break;
+ default:
+ return FALSE;
}
+}
-done:
- /* 7 */
- GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
- g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
- 0, probability, found_caps);
- typefind->mode = MODE_NORMAL;
+static gboolean
+gst_type_find_element_activate_sink (GstPad * pad, GstObject * parent)
+{
+ GstTypeFindElement *typefind;
+ GstQuery *query;
+ gboolean pull_mode;
+ GstCaps *found_caps = NULL;
+ GstTypeFindProbability probability = GST_TYPE_FIND_NONE;
-really_done:
- gst_caps_unref (found_caps);
+ typefind = GST_TYPE_FIND_ELEMENT (parent);
- /* 8 */
- if (gst_pad_is_active (pad))
- return TRUE;
- else {
- gboolean ret;
+ /* if we have force caps, use those */
+ GST_OBJECT_LOCK (typefind);
+ if (typefind->force_caps) {
+ found_caps = gst_caps_ref (typefind->force_caps);
+ probability = GST_TYPE_FIND_MAXIMUM;
+ GST_OBJECT_UNLOCK (typefind);
- GST_DEBUG ("Activating in push mode");
- ret = gst_pad_activate_mode (typefind->src, GST_PAD_MODE_PUSH, TRUE);
- ret &= gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
- return ret;
+ GST_DEBUG ("Emiting found caps %" GST_PTR_FORMAT, found_caps);
+ g_signal_emit (typefind, gst_type_find_element_signals[HAVE_TYPE],
+ 0, probability, found_caps);
+ typefind->mode = MODE_NORMAL;
+ goto typefind_push;
}
+ GST_OBJECT_UNLOCK (typefind);
+
+ query = gst_query_new_scheduling ();
+
+ if (!gst_pad_peer_query (pad, query)) {
+ gst_query_unref (query);
+ goto typefind_push;
+ }
+
+ pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
+ gst_query_unref (query);
+
+ if (!pull_mode)
+ goto typefind_push;
+
+ if (!gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE))
+ goto typefind_push;
+
+ return TRUE;
+
typefind_push:
{
- start_typefinding (typefind);
return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
}
}
diff --git a/plugins/elements/gsttypefindelement.h b/plugins/elements/gsttypefindelement.h
index 5c34758eac..4a363698b7 100644
--- a/plugins/elements/gsttypefindelement.h
+++ b/plugins/elements/gsttypefindelement.h
@@ -59,6 +59,11 @@ struct _GstTypeFindElement {
GList * cached_events;
GstCaps * force_caps;
+
+ /* Only used when driving the pipeline */
+ gboolean need_segment;
+ GstSegment segment;
+ guint64 offset;
};
struct _GstTypeFindElementClass {