summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThiago Santos <ts.santos@sisa.samsung.com>2013-11-22 16:18:08 -0300
committerThiago Santos <ts.santos@sisa.samsung.com>2013-11-22 18:35:48 -0300
commit920bf7e3c5c60b7f312533f57b57724976d3c82b (patch)
treec317bdae0021218716f83521ddaa13d4737eed0a
parent6d0252b34a27208a716e5c84a47fb119d3f465b1 (diff)
mssdemux: prevent deadlock by pushing buffers from different threadsmssdemux-multilanguage
When streams are switching, the old active stream can be blocked because input-selector will block not-linked streams. In case the mssdemux's stream loop is blocked pushing a buffer to a full queue downstream it will never unblock as the queue will not drain (input-selector is blocking). In this scenario, stream switching will deadlock as input-selector is waiting for the newly active stream data and the stream_loop that would push this data is blocked waiting for input-selector. To solve this issue, whenever an stream is reactivated on a reconfigure it will enter into the 'catch up mode', in this mode it can push buffers from its download thread until a certain timestamp. This works because this timestamp will always be behind or equal to the maximum timestamp pushed for all streams, after pushing data for this timestamp, the stream will go back to default and be pushed sequentially from the main streaming thread. By this time, the input-selector should have already released the thread.
-rw-r--r--ext/smoothstreaming/gstmssdemux.c131
-rw-r--r--ext/smoothstreaming/gstmssdemux.h1
2 files changed, 96 insertions, 36 deletions
diff --git a/ext/smoothstreaming/gstmssdemux.c b/ext/smoothstreaming/gstmssdemux.c
index 5ce203c70..c778ac9a2 100644
--- a/ext/smoothstreaming/gstmssdemux.c
+++ b/ext/smoothstreaming/gstmssdemux.c
@@ -621,6 +621,10 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
GstMssDemuxStream *stream = iter->data;
stream->eos = FALSE;
+ if (flags & GST_SEEK_FLAG_FLUSH) {
+ stream->catch_up_timestamp = 0;
+ stream->last_ret = GST_FLOW_OK;
+ }
gst_data_queue_flush (stream->dataqueue);
gst_event_replace (&stream->pending_newsegment, newsegment);
}
@@ -1073,13 +1077,13 @@ gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
static GstFlowReturn
gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
- gboolean * buffer_downloaded)
+ GstBuffer ** _buffer)
{
GstMssDemux *mssdemux = stream->parent;
gchar *path;
gchar *url;
GstFragment *fragment;
- GstBuffer *_buffer;
+ GstBuffer *buffer;
GstFlowReturn ret = GST_FLOW_OK;
guint64 before_download, after_download;
@@ -1132,22 +1136,19 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
return GST_FLOW_ERROR;
}
- _buffer = gst_fragment_get_buffer (fragment);
- _buffer = gst_buffer_make_writable (_buffer);
- GST_BUFFER_TIMESTAMP (_buffer) =
+ buffer = gst_fragment_get_buffer (fragment);
+ *_buffer = buffer = gst_buffer_make_writable (buffer);
+ GST_BUFFER_TIMESTAMP (buffer) =
gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
- GST_BUFFER_DURATION (_buffer) =
+ GST_BUFFER_DURATION (buffer) =
gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
g_object_unref (fragment);
- if (buffer_downloaded)
- *buffer_downloaded = _buffer != NULL;
-
after_download = g_get_real_time ();
if (_buffer) {
#ifndef GST_DISABLE_GST_DEBUG
- guint64 bitrate = (8 * gst_buffer_get_size (_buffer) * 1000000LLU) /
+ guint64 bitrate = (8 * gst_buffer_get_size (buffer) * 1000000LLU) /
(after_download - before_download);
#endif
@@ -1155,16 +1156,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
"Measured download bitrate: %s %" G_GUINT64_FORMAT " bps",
GST_PAD_NAME (stream->pad), bitrate);
gst_download_rate_add_rate (&stream->download_rate,
- gst_buffer_get_size (_buffer),
+ gst_buffer_get_size (buffer),
1000 * (after_download - before_download));
-
- GST_DEBUG_OBJECT (mssdemux,
- "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
- " Duration: %" GST_TIME_FORMAT,
- stream, GST_PAD_NAME (stream->pad),
- GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
- gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
}
return ret;
@@ -1189,8 +1182,9 @@ static void
gst_mss_demux_download_loop (GstMssDemuxStream * stream)
{
GstMssDemux *mssdemux = stream->parent;
- gboolean buffer_downloaded = FALSE;
GstFlowReturn ret;
+ GstBuffer *buffer = NULL;
+ gboolean buffer_downloaded = FALSE;
GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
@@ -1198,6 +1192,7 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
if (G_UNLIKELY (stream->restart_download)) {
GSList *iter;
GstClockTime cur, ts;
+ GstClockTime min_ts, max_ts;
gint64 pos;
GstEvent *gap;
@@ -1215,26 +1210,34 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
GST_TIME_FORMAT, GST_TIME_ARGS (ts));
} else {
ts = GST_CLOCK_TIME_NONE;
+ }
- GST_DEBUG_OBJECT (mssdemux, "Downstream position query failed, "
- "failling back to looking at other pads");
+ GST_DEBUG_OBJECT (mssdemux, "Downstream position query failed, "
+ "failling back to looking at other pads");
- /* enable this stream again,
- * seek to the smallest timestamps on all active streams */
- for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
- GstMssDemuxStream *other = iter->data;
+ min_ts = stream->next_timestamp;
+ max_ts = 0;
- /* TODO it might be possible that the stream that was previously
- * active has already switched into not-lined */
- if (other->last_ret != GST_FLOW_NOT_LINKED) {
- ts = MIN (ts, other->next_timestamp);
- }
+ /* enable this stream again,
+ * seek to the smallest timestamps on all active streams */
+ for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+ GstMssDemuxStream *other = iter->data;
+
+ /* TODO it might be possible that the stream that was previously
+ * active has already switched into not-linked */
+ if (other->last_ret != GST_FLOW_NOT_LINKED) {
+ min_ts = MIN (min_ts, other->next_timestamp);
}
+ max_ts = MAX (max_ts, other->next_timestamp);
}
+ /* we might have already pushed this data */
+ ts = MAX (ts, stream->next_timestamp);
+
GST_DEBUG_OBJECT (mssdemux, "Restarting stream %p %s:%s at "
- "position %" GST_TIME_FORMAT, stream,
- GST_DEBUG_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
+ "position %" GST_TIME_FORMAT ", catching up until %" GST_TIME_FORMAT,
+ stream, GST_DEBUG_PAD_NAME (stream->pad), GST_TIME_ARGS (ts),
+ GST_TIME_ARGS (max_ts));
if (GST_CLOCK_TIME_IS_VALID (ts)) {
gst_mss_stream_seek (stream->manifest_stream, ts);
@@ -1244,7 +1247,25 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
gst_pad_push_event (stream->pad, gap);
}
}
- stream->last_ret = GST_FLOW_OK;
+
+ /* This stream might be entering into catching up mode,
+ * meaning that it will push buffers from this same download thread
+ * until it reaches 'catch_up_timestamp'.
+ *
+ * The reason for this is that in case of stream switching, the other
+ * stream that was previously active might be blocking the stream_loop
+ * in case it is ahead enough that all queues are filled.
+ * In this case, it is possible that a downstream input-selector is
+ * blocking waiting for the currently active stream to reach the
+ * same position of the old linked stream because of the 'sync-streams'
+ * behavior.
+ *
+ * We can push from this thread up to 'catch_up_timestamp' as all other
+ * streams should be around the same timestamp.
+ */
+ stream->catch_up_timestamp = max_ts;
+ stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+
stream->restart_download = FALSE;
gst_task_start (mssdemux->stream_task);
GST_MSS_DEMUX_STREAM_UNLOCK (stream);
@@ -1256,10 +1277,48 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream)
GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
GST_OBJECT_UNLOCK (mssdemux);
- ret = gst_mss_demux_stream_download_fragment (stream, &buffer_downloaded);
+ ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
+ buffer_downloaded = buffer != NULL;
- if (stream->cancelled)
+ if (stream->cancelled) {
+ if (buffer)
+ gst_buffer_unref (buffer);
goto cancelled;
+ }
+
+ if (buffer) {
+ gboolean catch_up = FALSE;
+
+ /* Check if this stream is on catch up mode */
+ if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) {
+ GST_DEBUG_OBJECT (mssdemux,
+ "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream->catch_up_timestamp),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
+ if (GST_BUFFER_TIMESTAMP (buffer) < stream->catch_up_timestamp) {
+ catch_up = TRUE;
+ } else {
+ stream->last_ret = GST_FLOW_OK;
+ }
+ }
+
+ GST_DEBUG_OBJECT (mssdemux,
+ "%s buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
+ " Duration: %" GST_TIME_FORMAT,
+ catch_up ? "Catch up push for" : "Storing", stream,
+ GST_PAD_NAME (stream->pad),
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+ GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+
+ if (catch_up) {
+ ret = stream->last_ret = gst_pad_push (stream->pad, buffer);
+ if (G_LIKELY (ret == GST_FLOW_OK))
+ stream->last_ret = GST_FLOW_CUSTOM_SUCCESS;
+ /* TODO handle return */
+ } else {
+ gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer));
+ }
+ }
switch (ret) {
case GST_FLOW_OK:
diff --git a/ext/smoothstreaming/gstmssdemux.h b/ext/smoothstreaming/gstmssdemux.h
index d651b8050..25bedc0b6 100644
--- a/ext/smoothstreaming/gstmssdemux.h
+++ b/ext/smoothstreaming/gstmssdemux.h
@@ -72,6 +72,7 @@ struct _GstMssDemuxStream {
GstEvent *pending_newsegment;
GstClockTime next_timestamp;
+ GstClockTime catch_up_timestamp;
/* Downloading task */
GstTask *download_task;