From fc8c5cba157c67b36e0c8f8e1e172988826ba308 Mon Sep 17 00:00:00 2001 From: Tim-Philipp Müller Date: Tue, 31 Mar 2009 17:52:44 +0100 Subject: rtspconnection: make gst_rtsp_watch_queue_message() thread-safe People might queue messages from a thread other than the thread in which the main context which this watch is attached is iterated from, so use a GAsyncQueue instead of a GList, so g_list_append() doesn't trample over list nodes just freed in the other thread. This just fixes issues I've had with gst-rtsp-server. We might need more locking in various places here. --- gst-libs/gst/rtsp/gstrtspconnection.c | 40 ++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 4ae4a80e2..a6943833a 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -2646,7 +2646,7 @@ struct _GstRTSPWatch gboolean write_added; /* queued message for transmission */ - GList *messages; + GAsyncQueue *messages; guint8 *write_data; guint write_off; guint write_len; @@ -2733,18 +2733,17 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback, if (watch->write_data == NULL) { GstRTSPRec *data; - if (!watch->messages) + /* get a new message from the queue */ + data = g_async_queue_try_pop (watch->messages); + if (data == NULL) goto done; - /* no data, get a new message from the queue */ - data = watch->messages->data; - watch->messages = g_list_delete_link (watch->messages, watch->messages); - watch->write_off = 0; watch->write_len = data->str->len; watch->write_data = (guint8 *) g_string_free (data->str, FALSE); watch->write_cseq = data->cseq; + data->str = NULL; g_slice_free (GstRTSPRec, data); } @@ -2759,7 +2758,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback, watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data); done: - if (watch->messages == NULL && watch->write_added) { + if (g_async_queue_length (watch->messages) == 0 && watch->write_added) { g_source_remove_poll ((GSource *) watch, &watch->writefd); watch->write_added = FALSE; watch->writefd.revents = 0; @@ -2786,22 +2785,25 @@ error: } } +static void +gst_rtsp_rec_free (gpointer data) +{ + GstRTSPRec *rec = data; + + g_string_free (rec->str, TRUE); + rec->str = NULL; + g_slice_free (GstRTSPRec, rec); +} + static void gst_rtsp_source_finalize (GSource * source) { GstRTSPWatch *watch = (GstRTSPWatch *) source; - GList *walk; build_reset (&watch->builder); - for (walk = watch->messages; walk; walk = g_list_next (walk)) { - GstRTSPRec *data = walk->data; - - g_string_free (data->str, TRUE); - g_slice_free (GstRTSPRec, data); - } - g_list_free (watch->messages); - g_free (watch->write_data); + g_async_queue_unref (watch->messages); + watch->messages = NULL; if (watch->notify) watch->notify (watch->user_data); @@ -2851,6 +2853,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->conn = conn; result->builder.state = STATE_START; + result->messages = g_async_queue_new_full (gst_rtsp_rec_free); + result->readfd.fd = -1; result->writefd.fd = -1; @@ -2943,8 +2947,10 @@ queue_response (GstRTSPWatch * watch, GString * str, guint cseq) data->cseq = cseq; /* add the record to a queue. FIXME we would like to have an upper limit here */ - watch->messages = g_list_append (watch->messages, data); + g_async_queue_push (watch->messages, data); + /* FIXME: does the following need to be made thread-safe? (queue_response + * might be called from a streaming thread, like appsink's render function) */ /* make sure the main context will now also check for writability on the * socket */ if (!watch->write_added) { -- cgit v1.2.3