summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2014-07-10 11:32:20 +0200
committerWim Taymans <wtaymans@redhat.com>2014-07-10 11:36:55 +0200
commit945c93fde09b461d4b810846e2a9443f915e6507 (patch)
treec3598f6eb9c3bf435d034a3b57f52327bdf2e934
parent6543082d2b6eb75da4bddc5b43aa24e759123fae (diff)
filter: Release lock in filter functions
Release the object lock before calling the filter functions. We need to keep a cookie to detect when the list changed during the filter callback. We also keep a hashtable to make sure we only call the filter function once for each object in case of concurrent modification. Fixes https://bugzilla.gnome.org/show_bug.cgi?id=732950
-rw-r--r--gst/rtsp-server/rtsp-client.c40
-rw-r--r--gst/rtsp-server/rtsp-server.c35
-rw-r--r--gst/rtsp-server/rtsp-session-pool.c103
-rw-r--r--gst/rtsp-server/rtsp-session.c39
-rw-r--r--gst/rtsp-server/rtsp-stream.c38
5 files changed, 194 insertions, 61 deletions
diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c
index 43c671a..0fd751a 100644
--- a/gst/rtsp-server/rtsp-client.c
+++ b/gst/rtsp-server/rtsp-client.c
@@ -83,6 +83,7 @@ struct _GstRTSPClientPrivate
GList *transports;
GList *sessions;
+ guint sessions_cookie;
gboolean drop_backlog;
};
@@ -305,6 +306,7 @@ client_watch_session (GstRTSPClient * client, GstRTSPSession * session)
GST_INFO ("watching session %p", session);
priv->sessions = g_list_prepend (priv->sessions, g_object_ref (session));
+ priv->sessions_cookie++;
/* connect removed session handler, it will be disconnected when the last
* session gets removed */
@@ -334,12 +336,12 @@ client_unwatch_session (GstRTSPClient * client, GstRTSPSession * session,
}
priv->sessions = g_list_delete_link (priv->sessions, link);
+ priv->sessions_cookie++;
/* if this was the last session, disconnect the handler.
* This will also drop the extra client ref */
if (!priv->sessions) {
- g_signal_handler_disconnect (priv->session_pool,
- priv->session_removed_id);
+ g_signal_handler_disconnect (priv->session_pool, priv->session_removed_id);
priv->session_removed_id = 0;
}
@@ -3455,29 +3457,50 @@ gst_rtsp_client_session_filter (GstRTSPClient * client,
{
GstRTSPClientPrivate *priv;
GList *result, *walk, *next;
+ GHashTable *visited;
+ guint cookie;
g_return_val_if_fail (GST_IS_RTSP_CLIENT (client), NULL);
priv = client->priv;
result = NULL;
+ if (func)
+ visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
+restart:
+ cookie = priv->sessions_cookie;
for (walk = priv->sessions; walk; walk = next) {
GstRTSPSession *sess = walk->data;
GstRTSPFilterResult res;
+ gboolean changed;
next = g_list_next (walk);
- if (func)
+ if (func) {
+ /* only visit each session once */
+ if (g_hash_table_contains (visited, sess))
+ continue;
+
+ g_hash_table_add (visited, g_object_ref (sess));
+ g_mutex_unlock (&priv->lock);
+
res = func (client, sess, user_data);
- else
+
+ g_mutex_lock (&priv->lock);
+ } else
res = GST_RTSP_FILTER_REF;
+ changed = (cookie != priv->sessions_cookie);
+
switch (res) {
case GST_RTSP_FILTER_REMOVE:
- /* stop watching the session and pretent it went away */
- client_unwatch_session (client, sess, walk);
+ /* stop watching the session and pretend it went away, if the list was
+ * changed, we can't use the current list position, try to see if we
+ * still have the session */
+ client_unwatch_session (client, sess, changed ? NULL : walk);
+ cookie = priv->sessions_cookie;
break;
case GST_RTSP_FILTER_REF:
result = g_list_prepend (result, g_object_ref (sess));
@@ -3486,8 +3509,13 @@ gst_rtsp_client_session_filter (GstRTSPClient * client,
default:
break;
}
+ if (changed)
+ goto restart;
}
g_mutex_unlock (&priv->lock);
+ if (func)
+ g_hash_table_unref (visited);
+
return result;
}
diff --git a/gst/rtsp-server/rtsp-server.c b/gst/rtsp-server/rtsp-server.c
index a5ea95a..84a7d50 100644
--- a/gst/rtsp-server/rtsp-server.c
+++ b/gst/rtsp-server/rtsp-server.c
@@ -90,6 +90,7 @@ struct _GstRTSPServerPrivate
/* the clients that are connected */
GList *clients;
+ guint clients_cookie;
};
#define DEFAULT_ADDRESS "0.0.0.0"
@@ -999,6 +1000,7 @@ unmanage_client (GstRTSPClient * client, ClientContext * ctx)
GST_RTSP_SERVER_LOCK (server);
priv->clients = g_list_remove (priv->clients, ctx);
+ priv->clients_cookie++;
GST_RTSP_SERVER_UNLOCK (server);
if (ctx->thread) {
@@ -1050,6 +1052,7 @@ manage_client (GstRTSPServer * server, GstRTSPClient * client)
g_signal_connect (client, "closed", (GCallback) unmanage_client, cctx);
priv->clients = g_list_prepend (priv->clients, cctx);
+ priv->clients_cookie++;
gst_rtsp_client_attach (client, mainctx);
@@ -1361,38 +1364,62 @@ gst_rtsp_server_client_filter (GstRTSPServer * server,
{
GstRTSPServerPrivate *priv;
GList *result, *walk, *next;
+ GHashTable *visited;
+ guint cookie;
g_return_val_if_fail (GST_IS_RTSP_SERVER (server), NULL);
priv = server->priv;
result = NULL;
+ if (func)
+ visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
GST_RTSP_SERVER_LOCK (server);
+restart:
+ cookie = priv->clients_cookie;
for (walk = priv->clients; walk; walk = next) {
ClientContext *cctx = walk->data;
+ GstRTSPClient *client = cctx->client;
GstRTSPFilterResult res;
+ gboolean changed;
next = g_list_next (walk);
- if (func)
- res = func (server, cctx->client, user_data);
- else
+ if (func) {
+ /* only visit each media once */
+ if (g_hash_table_contains (visited, client))
+ continue;
+
+ g_hash_table_add (visited, g_object_ref (client));
+ GST_RTSP_SERVER_UNLOCK (server);
+
+ res = func (server, client, user_data);
+
+ GST_RTSP_SERVER_LOCK (server);
+ } else
res = GST_RTSP_FILTER_REF;
+ changed = (cookie != priv->clients_cookie);
+
switch (res) {
case GST_RTSP_FILTER_REMOVE:
/* remove client, FIXME */
break;
case GST_RTSP_FILTER_REF:
- result = g_list_prepend (result, g_object_ref (cctx->client));
+ result = g_list_prepend (result, g_object_ref (client));
break;
case GST_RTSP_FILTER_KEEP:
default:
break;
}
+ if (changed)
+ goto restart;
}
GST_RTSP_SERVER_UNLOCK (server);
+ if (func)
+ g_hash_table_unref (visited);
+
return result;
}
diff --git a/gst/rtsp-server/rtsp-session-pool.c b/gst/rtsp-server/rtsp-session-pool.c
index a7aa5b2..7699196 100644
--- a/gst/rtsp-server/rtsp-session-pool.c
+++ b/gst/rtsp-server/rtsp-session-pool.c
@@ -50,6 +50,7 @@ struct _GstRTSPSessionPoolPrivate
GMutex lock; /* protects everything in this struct */
guint max_sessions;
GHashTable *sessions;
+ guint sessions_cookie;
};
#define DEFAULT_MAX_SESSIONS 0
@@ -394,6 +395,7 @@ gst_rtsp_session_pool_create (GstRTSPSessionPool * pool)
g_object_ref (result);
g_hash_table_insert (priv->sessions,
(gchar *) gst_rtsp_session_get_sessionid (result), result);
+ priv->sessions_cookie++;
}
g_mutex_unlock (&priv->lock);
@@ -455,6 +457,7 @@ gst_rtsp_session_pool_remove (GstRTSPSessionPool * pool, GstRTSPSession * sess)
g_hash_table_remove (priv->sessions,
gst_rtsp_session_get_sessionid (sess));
if (found) {
+ priv->sessions_cookie++;
g_signal_emit (pool, gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED],
0, sess);
}
@@ -511,44 +514,13 @@ gst_rtsp_session_pool_cleanup (GstRTSPSessionPool * pool)
result =
g_hash_table_foreach_remove (priv->sessions, (GHRFunc) cleanup_func,
&data);
+ if (result > 0)
+ priv->sessions_cookie++;
g_mutex_unlock (&priv->lock);
return result;
}
-typedef struct
-{
- GstRTSPSessionPool *pool;
- GstRTSPSessionPoolFilterFunc func;
- gpointer user_data;
- GList *list;
-} FilterData;
-
-static gboolean
-filter_func (gchar * sessionid, GstRTSPSession * sess, FilterData * data)
-{
- GstRTSPFilterResult res;
-
- if (data->func)
- res = data->func (data->pool, sess, data->user_data);
- else
- res = GST_RTSP_FILTER_REF;
-
- switch (res) {
- case GST_RTSP_FILTER_REMOVE:
- g_signal_emit (data->pool,
- gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, sess);
- return TRUE;
- case GST_RTSP_FILTER_REF:
- /* keep ref */
- data->list = g_list_prepend (data->list, g_object_ref (sess));
- /* fallthrough */
- default:
- case GST_RTSP_FILTER_KEEP:
- return FALSE;
- }
-}
-
/**
* gst_rtsp_session_pool_filter:
* @pool: a #GstRTSPSessionPool
@@ -580,22 +552,73 @@ gst_rtsp_session_pool_filter (GstRTSPSessionPool * pool,
GstRTSPSessionPoolFilterFunc func, gpointer user_data)
{
GstRTSPSessionPoolPrivate *priv;
- FilterData data;
+ GHashTableIter iter;
+ gpointer key, value;
+ GList *result;
+ GHashTable *visited;
+ guint cookie;
g_return_val_if_fail (GST_IS_RTSP_SESSION_POOL (pool), NULL);
priv = pool->priv;
- data.pool = pool;
- data.func = func;
- data.user_data = user_data;
- data.list = NULL;
+ result = NULL;
+ if (func)
+ visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
- g_hash_table_foreach_remove (priv->sessions, (GHRFunc) filter_func, &data);
+restart:
+ g_hash_table_iter_init (&iter, priv->sessions);
+ cookie = priv->sessions_cookie;
+ while (g_hash_table_iter_next (&iter, &key, &value)) {
+ GstRTSPSession *session = value;
+ GstRTSPFilterResult res;
+ gboolean changed;
+
+ if (func) {
+ /* only visit each session once */
+ if (g_hash_table_contains (visited, session))
+ continue;
+
+ g_hash_table_add (visited, g_object_ref (session));
+ g_mutex_unlock (&priv->lock);
+
+ res = func (pool, session, user_data);
+
+ g_mutex_lock (&priv->lock);
+ } else
+ res = GST_RTSP_FILTER_REF;
+
+ changed = (cookie != priv->sessions_cookie);
+
+ switch (res) {
+ case GST_RTSP_FILTER_REMOVE:
+ g_signal_emit (pool,
+ gst_rtsp_session_pool_signals[SIGNAL_SESSION_REMOVED], 0, session);
+
+ if (changed)
+ g_hash_table_remove (priv->sessions, key);
+ else
+ g_hash_table_iter_remove (&iter);
+ cookie = ++priv->sessions_cookie;
+ break;
+ case GST_RTSP_FILTER_REF:
+ /* keep ref */
+ result = g_list_prepend (result, g_object_ref (session));
+ break;
+ case GST_RTSP_FILTER_KEEP:
+ default:
+ break;
+ }
+ if (changed)
+ goto restart;
+ }
g_mutex_unlock (&priv->lock);
- return data.list;
+ if (func)
+ g_hash_table_unref (visited);
+
+ return result;
}
typedef struct
diff --git a/gst/rtsp-server/rtsp-session.c b/gst/rtsp-server/rtsp-session.c
index 3463b32..372746a 100644
--- a/gst/rtsp-server/rtsp-session.c
+++ b/gst/rtsp-server/rtsp-session.c
@@ -62,6 +62,7 @@ struct _GstRTSPSessionPrivate
gint expire_count;
GList *medias;
+ guint medias_cookie;
};
#undef DEBUG
@@ -238,6 +239,7 @@ gst_rtsp_session_manage_media (GstRTSPSession * sess, const gchar * path,
g_mutex_lock (&priv->lock);
priv->medias = g_list_prepend (priv->medias, result);
+ priv->medias_cookie++;
g_mutex_unlock (&priv->lock);
GST_INFO ("manage new media %p in session %p", media, result);
@@ -269,8 +271,10 @@ gst_rtsp_session_release_media (GstRTSPSession * sess,
g_mutex_lock (&priv->lock);
find = g_list_find (priv->medias, media);
- if (find)
+ if (find) {
priv->medias = g_list_delete_link (priv->medias, find);
+ priv->medias_cookie++;
+ }
more = (priv->medias != NULL);
g_mutex_unlock (&priv->lock);
@@ -359,29 +363,51 @@ gst_rtsp_session_filter (GstRTSPSession * sess,
{
GstRTSPSessionPrivate *priv;
GList *result, *walk, *next;
+ GHashTable *visited;
+ guint cookie;
g_return_val_if_fail (GST_IS_RTSP_SESSION (sess), NULL);
priv = sess->priv;
result = NULL;
+ if (func)
+ visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
+restart:
+ cookie = priv->medias_cookie;
for (walk = priv->medias; walk; walk = next) {
GstRTSPSessionMedia *media = walk->data;
GstRTSPFilterResult res;
+ gboolean changed;
next = g_list_next (walk);
- if (func)
+ if (func) {
+ /* only visit each media once */
+ if (g_hash_table_contains (visited, media))
+ continue;
+
+ g_hash_table_add (visited, g_object_ref (media));
+ g_mutex_unlock (&priv->lock);
+
res = func (sess, media, user_data);
- else
+
+ g_mutex_lock (&priv->lock);
+ } else
res = GST_RTSP_FILTER_REF;
+ changed = (cookie != priv->medias_cookie);
+
switch (res) {
case GST_RTSP_FILTER_REMOVE:
+ if (changed)
+ priv->medias = g_list_remove (priv->medias, media);
+ else
+ priv->medias = g_list_delete_link (priv->medias, walk);
+ cookie = ++priv->medias_cookie;
g_object_unref (media);
- priv->medias = g_list_delete_link (priv->medias, walk);
break;
case GST_RTSP_FILTER_REF:
result = g_list_prepend (result, g_object_ref (media));
@@ -390,9 +416,14 @@ gst_rtsp_session_filter (GstRTSPSession * sess,
default:
break;
}
+ if (changed)
+ goto restart;
}
g_mutex_unlock (&priv->lock);
+ if (func)
+ g_hash_table_unref (visited);
+
return result;
}
diff --git a/gst/rtsp-server/rtsp-stream.c b/gst/rtsp-server/rtsp-stream.c
index 02c8274..942b837 100644
--- a/gst/rtsp-server/rtsp-stream.c
+++ b/gst/rtsp-server/rtsp-stream.c
@@ -124,8 +124,9 @@ struct _GstRTSPStreamPrivate
/* transports we stream to */
guint n_active;
GList *transports;
- gboolean tr_changed;
+ guint transports_cookie;
GList *tr_cache;
+ guint tr_cache_cookie;
gint dscp_qos;
@@ -1503,13 +1504,13 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
g_mutex_lock (&priv->lock);
- if (priv->tr_changed) {
+ if (priv->tr_cache_cookie != priv->transports_cookie) {
clear_tr_cache (priv);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
}
- priv->tr_changed = FALSE;
+ priv->tr_cache_cookie = priv->transports_cookie;
}
g_mutex_unlock (&priv->lock);
@@ -2268,7 +2269,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
priv->transports = g_list_remove (priv->transports, trans);
}
- priv->tr_changed = TRUE;
+ priv->transports_cookie++;
break;
}
case GST_RTSP_LOWER_TRANS_TCP:
@@ -2279,7 +2280,7 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
GST_INFO ("removing TCP %s", tr->destination);
priv->transports = g_list_remove (priv->transports, trans);
}
- priv->tr_changed = TRUE;
+ priv->transports_cookie++;
break;
default:
goto unknown_transport;
@@ -2497,25 +2498,43 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
{
GstRTSPStreamPrivate *priv;
GList *result, *walk, *next;
+ GHashTable *visited;
+ guint cookie;
g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
priv = stream->priv;
result = NULL;
+ if (func)
+ visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
g_mutex_lock (&priv->lock);
+restart:
+ cookie = priv->transports_cookie;
for (walk = priv->transports; walk; walk = next) {
GstRTSPStreamTransport *trans = walk->data;
GstRTSPFilterResult res;
+ gboolean changed;
next = g_list_next (walk);
- if (func)
+ if (func) {
+ /* only visit each transport once */
+ if (g_hash_table_contains (visited, trans))
+ continue;
+
+ g_hash_table_add (visited, g_object_ref (trans));
+ g_mutex_unlock (&priv->lock);
+
res = func (stream, trans, user_data);
- else
+
+ g_mutex_lock (&priv->lock);
+ } else
res = GST_RTSP_FILTER_REF;
+ changed = (cookie != priv->transports_cookie);
+
switch (res) {
case GST_RTSP_FILTER_REMOVE:
update_transport (stream, trans, FALSE);
@@ -2527,9 +2546,14 @@ gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
default:
break;
}
+ if (changed)
+ goto restart;
}
g_mutex_unlock (&priv->lock);
+ if (func)
+ g_hash_table_unref (visited);
+
return result;
}