summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathieu Duponchelle <mathieu@centricular.com>2020-02-06 22:46:18 +0100
committerMathieu Duponchelle <mduponchelle1@gmail.com>2020-02-24 20:24:29 +0000
commit54b6b3bcab504b28ce48c557ed735871e9c1bdc9 (patch)
tree824dfb1dbc58780e877c5203a1b69b76e2cf79f1
parent50ecbb15965639c89e5c127e749e1e2eef5d0302 (diff)
rtsp-stream: marshal calls to send_tcp_message to a single thread
In order to address the race condition pointed out at https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/merge_requests/108#note_403579 we get rid of the send thread pool, and instead spawn and manage a single thread to pull samples from app sinks and add them to the transport's backlogs. Additionally, we now also always go through the backlogs in order to simplify the logic.
-rw-r--r--gst/rtsp-server/rtsp-stream.c218
1 files changed, 127 insertions, 91 deletions
diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c
index 7fd171d..dc484e3 100644
--- a/gst/rtsp-server/rtsp-stream.c
+++ b/gst/rtsp-server/rtsp-stream.c
@@ -190,6 +190,18 @@ struct _GstRTSPStreamPrivate
gint dscp_qos;
+ /* Sending logic for TCP */
+ GThread *send_thread;
+ GCond send_cond;
+ GMutex send_lock;
+ /* @send_lock is released when pushing data out, we use
+ * a cookie to decide whether we should wait on @send_cond
+ * before checking the transports' backlogs again
+ */
+ guint send_cookie;
+ /* Used to control shutdown of @send_thread */
+ gboolean continue_sending;
+
/* stream blocking */
gulong blocked_id[2];
gboolean blocking;
@@ -314,6 +326,11 @@ gst_rtsp_stream_init (GstRTSPStream * stream)
g_mutex_init (&priv->lock);
+ priv->continue_sending = TRUE;
+ priv->send_cookie = 0;
+ g_cond_init (&priv->send_cond);
+ g_mutex_init (&priv->send_lock);
+
priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
NULL, (GDestroyNotify) gst_caps_unref);
priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
@@ -400,6 +417,9 @@ gst_rtsp_stream_finalize (GObject * obj)
g_hash_table_unref (priv->keys);
g_hash_table_destroy (priv->ptmap);
+ g_mutex_clear (&priv->send_lock);
+ g_cond_clear (&priv->send_cond);
+
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
}
@@ -2548,6 +2568,33 @@ ensure_cached_transports (GstRTSPStream * stream)
}
}
+/* Must be called *without* priv->lock */
+static void
+check_transport_backlog (GstRTSPStream * stream, GstRTSPStreamTransport * trans)
+{
+ gst_rtsp_stream_transport_lock_backlog (trans);
+
+ if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
+ GstBuffer *buffer;
+ GstBufferList *buffer_list;
+ gboolean is_rtp;
+ gboolean popped;
+
+ popped =
+ gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list,
+ &is_rtp);
+
+ g_assert (popped == TRUE);
+
+ push_data (stream, trans, buffer, buffer_list, is_rtp);
+
+ gst_clear_buffer (&buffer);
+ gst_clear_buffer_list (&buffer_list);
+ }
+
+ gst_rtsp_stream_transport_unlock_backlog (trans);
+}
+
/* Must be called with priv->lock */
static void
send_tcp_message (GstRTSPStream * stream, gint idx)
@@ -2598,59 +2645,67 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
if (transports)
g_ptr_array_ref (transports);
- g_mutex_unlock (&priv->lock);
-
if (transports) {
gint index;
for (index = 0; index < transports->len; index++) {
GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
+ GstBuffer *buf_ref = NULL;
+ GstBufferList *buflist_ref = NULL;
gst_rtsp_stream_transport_lock_backlog (tr);
- if (gst_rtsp_stream_transport_backlog_is_empty (tr)
- && !gst_rtsp_stream_transport_check_back_pressure (tr, idx)) {
- push_data (stream, tr, buffer, buffer_list, is_rtp);
- } else {
- GstBuffer *buf_ref = NULL;
- GstBufferList *buflist_ref = NULL;
+ if (buffer)
+ buf_ref = gst_buffer_ref (buffer);
+ if (buffer_list)
+ buflist_ref = gst_buffer_list_ref (buffer_list);
- if (buffer)
- buf_ref = gst_buffer_ref (buffer);
- if (buffer_list)
- buflist_ref = gst_buffer_list_ref (buffer_list);
-
- if (!gst_rtsp_stream_transport_backlog_push (tr,
- buf_ref, buflist_ref, is_rtp)) {
- GST_WARNING_OBJECT (stream,
- "Dropping slow transport %" GST_PTR_FORMAT, tr);
- g_mutex_lock (&priv->lock);
- update_transport (stream, tr, FALSE);
- g_mutex_unlock (&priv->lock);
- }
+ if (!gst_rtsp_stream_transport_backlog_push (tr,
+ buf_ref, buflist_ref, is_rtp)) {
+ GST_ERROR_OBJECT (stream,
+ "Dropping slow transport %" GST_PTR_FORMAT, tr);
+ update_transport (stream, tr, FALSE);
}
gst_rtsp_stream_transport_unlock_backlog (tr);
}
- g_ptr_array_unref (transports);
}
gst_sample_unref (sample);
+ g_mutex_unlock (&priv->lock);
+
+ if (transports) {
+ gint index;
+
+ for (index = 0; index < transports->len; index++) {
+ GstRTSPStreamTransport *tr = g_ptr_array_index (transports, index);
+
+ check_transport_backlog (stream, tr);
+ }
+ g_ptr_array_unref (transports);
+ }
+
g_mutex_lock (&priv->lock);
}
-static void
-send_thread_main (gpointer data, gpointer user_data)
+static gpointer
+send_func (GstRTSPStream * stream)
{
- GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
- gint idx;
- gint i;
+ gboolean cont = TRUE;
- g_mutex_lock (&priv->lock);
+ g_mutex_lock (&priv->send_lock);
+
+ while (cont) {
+ int i;
+ int idx = -1;
+ guint cookie;
+
+ cookie = priv->send_cookie;
+ g_mutex_unlock (&priv->send_lock);
+
+ g_mutex_lock (&priv->lock);
- do {
- idx = -1;
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
@@ -2660,12 +2715,22 @@ send_thread_main (gpointer data, gpointer user_data)
}
}
- if (idx != -1)
+ if (idx != -1) {
send_tcp_message (stream, idx);
- } while (idx != -1);
+ }
- GST_DEBUG_OBJECT (stream, "send thread done");
- g_mutex_unlock (&priv->lock);
+ g_mutex_unlock (&priv->lock);
+
+ g_mutex_lock (&priv->send_lock);
+ while (cookie == priv->send_cookie) {
+ g_cond_wait (&priv->send_cond, &priv->send_lock);
+ }
+ cont = priv->continue_sending;
+ }
+
+ g_mutex_unlock (&priv->send_lock);
+
+ return NULL;
}
static GstFlowReturn
@@ -2674,28 +2739,27 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
int i;
- int idx = -1;
g_mutex_lock (&priv->lock);
- if (priv->send_pool == NULL) {
- GST_DEBUG_OBJECT (stream, "create thread pool");
- priv->send_pool =
- g_thread_pool_new (send_thread_main, user_data, 1, TRUE, NULL);
- }
-
- for (i = 0; i < 2; i++)
+ for (i = 0; i < 2; i++) {
if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
priv->have_buffer[i] = TRUE;
- idx = i;
break;
}
+ }
- if (idx != -1)
- send_tcp_message (stream, idx);
+ if (priv->send_thread == NULL) {
+ priv->send_thread = g_thread_new (NULL, (GThreadFunc) send_func, user_data);
+ }
g_mutex_unlock (&priv->lock);
+ g_mutex_lock (&priv->send_lock);
+ priv->send_cookie++;
+ g_cond_signal (&priv->send_cond);
+ g_mutex_unlock (&priv->send_lock);
+
return GST_FLOW_OK;
}
@@ -3904,6 +3968,16 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
priv = stream->priv;
+ g_mutex_lock (&priv->send_lock);
+ priv->continue_sending = FALSE;
+ priv->send_cookie++;
+ g_cond_signal (&priv->send_cond);
+ g_mutex_unlock (&priv->send_lock);
+
+ if (priv->send_thread) {
+ g_thread_join (priv->send_thread);
+ }
+
g_mutex_lock (&priv->lock);
if (priv->joined_bin == NULL)
goto was_not_joined;
@@ -4557,55 +4631,17 @@ mcast_error:
static void
on_message_sent (GstRTSPStreamTransport * trans, gpointer user_data)
{
- GstRTSPStream *stream = user_data;
+ GstRTSPStream *stream = GST_RTSP_STREAM (user_data);
GstRTSPStreamPrivate *priv = stream->priv;
- gint idx = -1;
- gint i;
GST_DEBUG_OBJECT (stream, "message send complete");
- gst_rtsp_stream_transport_lock_backlog (trans);
-
- if (!gst_rtsp_stream_transport_backlog_is_empty (trans)) {
- GstBuffer *buffer;
- GstBufferList *buffer_list;
- gboolean is_rtp;
- gboolean popped;
+ check_transport_backlog (stream, trans);
- popped =
- gst_rtsp_stream_transport_backlog_pop (trans, &buffer, &buffer_list,
- &is_rtp);
-
- g_assert (popped == TRUE);
-
- push_data (stream, trans, buffer, buffer_list, is_rtp);
-
- gst_clear_buffer (&buffer);
- gst_clear_buffer_list (&buffer_list);
- } else {
- g_mutex_lock (&priv->lock);
- /* iterate from 1 and down, so we prioritize RTCP over RTP */
- for (i = 1; i >= 0; i--) {
- if (priv->have_buffer[i]) {
- /* send message */
- idx = i;
- break;
- }
- }
-
- if (idx != -1) {
- gint dummy;
-
- if (priv->send_pool) {
- GST_DEBUG_OBJECT (stream, "start thread");
- g_thread_pool_push (priv->send_pool, &dummy, NULL);
- }
- }
-
- g_mutex_unlock (&priv->lock);
- }
-
- gst_rtsp_stream_transport_unlock_backlog (trans);
+ g_mutex_lock (&priv->send_lock);
+ priv->send_cookie++;
+ g_cond_signal (&priv->send_cond);
+ g_mutex_unlock (&priv->send_lock);
}
/**
@@ -6027,8 +6063,8 @@ gst_rtsp_stream_set_rate_control (GstRTSPStream * stream, gboolean enabled)
if (stream->priv->appsink[0])
g_object_set (stream->priv->appsink[0], "sync", enabled, NULL);
if (stream->priv->payloader
- && g_object_class_find_property (G_OBJECT_GET_CLASS (stream->
- priv->payloader), "onvif-no-rate-control"))
+ && g_object_class_find_property (G_OBJECT_GET_CLASS (stream->priv->
+ payloader), "onvif-no-rate-control"))
g_object_set (stream->priv->payloader, "onvif-no-rate-control", !enabled,
NULL);
if (stream->priv->session) {