summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2012-12-14 11:36:58 +0100
committerWim Taymans <wim.taymans@collabora.co.uk>2012-12-14 11:51:16 +0100
commit95c384136a1dfc22ad656c13b547c4077566d80b (patch)
treede0574ced0244146fe4ca162dbdab99776cb87e7
parent093d292b67c22d7adf2bdbe647f9827eff73ecf7 (diff)
rtspconnection: add limit to queued messages
Add a limit to the amount of queued bytes or messages we allow on the watch. API: GstRTSPConnection::gst_rtsp_watch_set_send_backlog() API: GstRTSPConnection::gst_rtsp_watch_get_send_backlog() Conflicts: gst-libs/gst/rtsp/gstrtspconnection.c
-rw-r--r--gst-libs/gst/rtsp/gstrtspconnection.c89
-rw-r--r--gst-libs/gst/rtsp/gstrtspconnection.h5
2 files changed, 91 insertions, 3 deletions
diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c
index 543d407a0..a76dc6be0 100644
--- a/gst-libs/gst/rtsp/gstrtspconnection.c
+++ b/gst-libs/gst/rtsp/gstrtspconnection.c
@@ -3062,10 +3062,13 @@ struct _GstRTSPWatch
guint id;
GMutex *mutex;
GQueue *messages;
+ gsize messages_bytes;
guint8 *write_data;
guint write_off;
guint write_size;
guint write_id;
+ gsize max_bytes;
+ guint max_messages;
GstRTSPWatchFuncs funcs;
@@ -3213,6 +3216,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
if (rec == NULL)
break;
+ watch->messages_bytes -= rec->size;
+
watch->write_off = 0;
watch->write_data = rec->data;
watch->write_size = rec->size;
@@ -3317,6 +3322,7 @@ gst_rtsp_source_finalize (GSource * source)
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
g_queue_free (watch->messages);
watch->messages = NULL;
+ watch->messages_bytes = 0;
g_free (watch->write_data);
g_mutex_free (watch->mutex);
@@ -3454,9 +3460,63 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
}
/**
+ * gst_rtsp_watch_set_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: maximum bytes
+ * @messages: maximum messages
+ *
+ * Set the maximum amount of bytes and messages that will be queued in @watch.
+ * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
+ * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
+ *
+ * A value of 0 for @bytes or @messages means no limits.
+ *
+ * Since: 0.10.37
+ */
+void
+gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
+ gsize bytes, guint messages)
+{
+ g_return_if_fail (watch != NULL);
+
+ g_mutex_lock (watch->mutex);
+ watch->max_bytes = bytes;
+ watch->max_messages = messages;
+ g_mutex_unlock (watch->mutex);
+
+ GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
+ bytes, messages);
+}
+
+/**
+ * gst_rtsp_watch_get_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: (out) (allow-none): maximum bytes
+ * @messages: (out) (allow-none): maximum messages
+ *
+ * Get the maximum amount of bytes and messages that will be queued in @watch.
+ * See gst_rtsp_watch_set_send_backlog().
+ *
+ * Since: 0.10.37
+ */
+void
+gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
+ gsize * bytes, guint * messages)
+{
+ g_return_if_fail (watch != NULL);
+
+ g_mutex_lock (watch->mutex);
+ if (bytes)
+ *bytes = watch->max_bytes;
+ if (messages)
+ *messages = watch->max_messages;
+ g_mutex_unlock (watch->mutex);
+}
+
+/**
* gst_rtsp_watch_write_data:
* @watch: a #GstRTSPWatch
- * @data: the data to queue
+ * @data: (array length=size) (transfer full): the data to queue
* @size: the size of @data
* @id: location for a message ID or %NULL
*
@@ -3469,7 +3529,12 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
*
* This function will take ownership of @data and g_free() it after use.
*
- * Returns: #GST_RTSP_OK on success.
+ * If the amount of queued data exceeds the limits set with
+ * gst_rtsp_watch_set_send_backlog(), this function will return
+ * #GST_RTSP_ENOMEM.
+ *
+ * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
+ * are reached.
*
* Since: 0.10.25
*/
@@ -3499,6 +3564,12 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
}
}
+ /* check limits */
+ if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) ||
+ (watch->max_messages != 0
+ && watch->messages->length >= watch->max_messages))
+ goto too_much_backlog;
+
/* make a record with the data and id for sending async */
rec = g_slice_new (GstRTSPRec);
if (off == 0) {
@@ -3515,8 +3586,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
rec->id = ++watch->id;
} while (G_UNLIKELY (rec->id == 0));
- /* add the record to a queue. FIXME we would like to have an upper limit here */
+ /* add the record to a queue. */
g_queue_push_head (watch->messages, rec);
+ watch->messages_bytes += rec->size;
/* make sure the main context will now also check for writability on the
* socket */
@@ -3536,6 +3608,17 @@ done:
g_main_context_wakeup (context);
return res;
+
+ /* ERRORS */
+too_much_backlog:
+ {
+ GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
+ G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
+ watch->messages_bytes, watch->max_messages, watch->messages->length);
+ g_mutex_unlock (watch->mutex);
+ g_free ((gpointer) data);
+ return GST_RTSP_ENOMEM;
+ }
}
/**
diff --git a/gst-libs/gst/rtsp/gstrtspconnection.h b/gst-libs/gst/rtsp/gstrtspconnection.h
index 2a34abd0f..2ecd32564 100644
--- a/gst-libs/gst/rtsp/gstrtspconnection.h
+++ b/gst-libs/gst/rtsp/gstrtspconnection.h
@@ -189,6 +189,11 @@ void gst_rtsp_watch_unref (GstRTSPWatch *watch);
guint gst_rtsp_watch_attach (GstRTSPWatch *watch,
GMainContext *context);
+void gst_rtsp_watch_set_send_backlog (GstRTSPWatch *watch,
+ gsize bytes, guint messages);
+void gst_rtsp_watch_get_send_backlog (GstRTSPWatch *watch,
+ gsize *bytes, guint *messages);
+
GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch *watch,
const guint8 *data,
guint size, guint *id);