summaryrefslogtreecommitdiff
authorTim-Philipp Müller <tim.muller@collabora.co.uk>2009-03-31 16:52:44 (GMT)
committer Tim-Philipp Müller <tim.muller@collabora.co.uk>2009-03-31 17:30:57 (GMT)
commitfc8c5cba157c67b36e0c8f8e1e172988826ba308 (patch) (side-by-side diff)
treec7c5a55e2bc274e04fbad3a795a7562f1a17b16b
parentdfe96ce6188693fe3bef16407bb0b87b85afbbe9 (diff)
downloadgst-plugins-base-fc8c5cba157c67b36e0c8f8e1e172988826ba308.zip
gst-plugins-base-fc8c5cba157c67b36e0c8f8e1e172988826ba308.tar.gz
rtspconnection: make gst_rtsp_watch_queue_message() thread-safe
People might queue messages from a thread other than the thread in which the main context which this watch is attached is iterated from, so use a GAsyncQueue instead of a GList, so g_list_append() doesn't trample over list nodes just freed in the other thread. This just fixes issues I've had with gst-rtsp-server. We might need more locking in various places here.
Diffstat (more/less context) (ignore whitespace changes)
-rw-r--r--gst-libs/gst/rtsp/gstrtspconnection.c40
1 files changed, 23 insertions, 17 deletions
diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c
index 4ae4a80..a694383 100644
--- a/gst-libs/gst/rtsp/gstrtspconnection.c
+++ b/gst-libs/gst/rtsp/gstrtspconnection.c
@@ -2646,7 +2646,7 @@ struct _GstRTSPWatch
gboolean write_added;
/* queued message for transmission */
- GList *messages;
+ GAsyncQueue *messages;
guint8 *write_data;
guint write_off;
guint write_len;
@@ -2733,18 +2733,17 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
if (watch->write_data == NULL) {
GstRTSPRec *data;
- if (!watch->messages)
+ /* get a new message from the queue */
+ data = g_async_queue_try_pop (watch->messages);
+ if (data == NULL)
goto done;
- /* no data, get a new message from the queue */
- data = watch->messages->data;
- watch->messages = g_list_delete_link (watch->messages, watch->messages);
-
watch->write_off = 0;
watch->write_len = data->str->len;
watch->write_data = (guint8 *) g_string_free (data->str, FALSE);
watch->write_cseq = data->cseq;
+ data->str = NULL;
g_slice_free (GstRTSPRec, data);
}
@@ -2759,7 +2758,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data);
done:
- if (watch->messages == NULL && watch->write_added) {
+ if (g_async_queue_length (watch->messages) == 0 && watch->write_added) {
g_source_remove_poll ((GSource *) watch, &watch->writefd);
watch->write_added = FALSE;
watch->writefd.revents = 0;
@@ -2787,21 +2786,24 @@ error:
}
static void
+gst_rtsp_rec_free (gpointer data)
+{
+ GstRTSPRec *rec = data;
+
+ g_string_free (rec->str, TRUE);
+ rec->str = NULL;
+ g_slice_free (GstRTSPRec, rec);
+}
+
+static void
gst_rtsp_source_finalize (GSource * source)
{
GstRTSPWatch *watch = (GstRTSPWatch *) source;
- GList *walk;
build_reset (&watch->builder);
- for (walk = watch->messages; walk; walk = g_list_next (walk)) {
- GstRTSPRec *data = walk->data;
-
- g_string_free (data->str, TRUE);
- g_slice_free (GstRTSPRec, data);
- }
- g_list_free (watch->messages);
- g_free (watch->write_data);
+ g_async_queue_unref (watch->messages);
+ watch->messages = NULL;
if (watch->notify)
watch->notify (watch->user_data);
@@ -2851,6 +2853,8 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
result->conn = conn;
result->builder.state = STATE_START;
+ result->messages = g_async_queue_new_full (gst_rtsp_rec_free);
+
result->readfd.fd = -1;
result->writefd.fd = -1;
@@ -2943,8 +2947,10 @@ queue_response (GstRTSPWatch * watch, GString * str, guint cseq)
data->cseq = cseq;
/* add the record to a queue. FIXME we would like to have an upper limit here */
- watch->messages = g_list_append (watch->messages, data);
+ g_async_queue_push (watch->messages, data);
+ /* FIXME: does the following need to be made thread-safe? (queue_response
+ * might be called from a streaming thread, like appsink's render function) */
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {