diff options
author | Wim Taymans <wim.taymans@collabora.co.uk> | 2012-12-14 11:36:58 +0100 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2012-12-14 11:51:16 +0100 |
commit | 95c384136a1dfc22ad656c13b547c4077566d80b (patch) | |
tree | de0574ced0244146fe4ca162dbdab99776cb87e7 | |
parent | 093d292b67c22d7adf2bdbe647f9827eff73ecf7 (diff) |
rtspconnection: add limit to queued messages
Add a limit to the amount of queued bytes or messages we allow on the watch.
API: GstRTSPConnection::gst_rtsp_watch_set_send_backlog()
API: GstRTSPConnection::gst_rtsp_watch_get_send_backlog()
Conflicts:
gst-libs/gst/rtsp/gstrtspconnection.c
-rw-r--r-- | gst-libs/gst/rtsp/gstrtspconnection.c | 89 | ||||
-rw-r--r-- | gst-libs/gst/rtsp/gstrtspconnection.h | 5 |
2 files changed, 91 insertions, 3 deletions
diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 543d407a0..a76dc6be0 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -3062,10 +3062,13 @@ struct _GstRTSPWatch guint id; GMutex *mutex; GQueue *messages; + gsize messages_bytes; guint8 *write_data; guint write_off; guint write_size; guint write_id; + gsize max_bytes; + guint max_messages; GstRTSPWatchFuncs funcs; @@ -3213,6 +3216,8 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, if (rec == NULL) break; + watch->messages_bytes -= rec->size; + watch->write_off = 0; watch->write_data = rec->data; watch->write_size = rec->size; @@ -3317,6 +3322,7 @@ gst_rtsp_source_finalize (GSource * source) g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); g_queue_free (watch->messages); watch->messages = NULL; + watch->messages_bytes = 0; g_free (watch->write_data); g_mutex_free (watch->mutex); @@ -3454,9 +3460,63 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch) } /** + * gst_rtsp_watch_set_send_backlog: + * @watch: a #GstRTSPWatch + * @bytes: maximum bytes + * @messages: maximum messages + * + * Set the maximum amount of bytes and messages that will be queued in @watch. + * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and + * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM. + * + * A value of 0 for @bytes or @messages means no limits. + * + * Since: 0.10.37 + */ +void +gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch, + gsize bytes, guint messages) +{ + g_return_if_fail (watch != NULL); + + g_mutex_lock (watch->mutex); + watch->max_bytes = bytes; + watch->max_messages = messages; + g_mutex_unlock (watch->mutex); + + GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u", + bytes, messages); +} + +/** + * gst_rtsp_watch_get_send_backlog: + * @watch: a #GstRTSPWatch + * @bytes: (out) (allow-none): maximum bytes + * @messages: (out) (allow-none): maximum messages + * + * Get the maximum amount of bytes and messages that will be queued in @watch. + * See gst_rtsp_watch_set_send_backlog(). + * + * Since: 0.10.37 + */ +void +gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch, + gsize * bytes, guint * messages) +{ + g_return_if_fail (watch != NULL); + + g_mutex_lock (watch->mutex); + if (bytes) + *bytes = watch->max_bytes; + if (messages) + *messages = watch->max_messages; + g_mutex_unlock (watch->mutex); +} + +/** * gst_rtsp_watch_write_data: * @watch: a #GstRTSPWatch - * @data: the data to queue + * @data: (array length=size) (transfer full): the data to queue * @size: the size of @data * @id: location for a message ID or %NULL * @@ -3469,7 +3529,12 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch) * * This function will take ownership of @data and g_free() it after use. * - * Returns: #GST_RTSP_OK on success. + * If the amount of queued data exceeds the limits set with + * gst_rtsp_watch_set_send_backlog(), this function will return + * #GST_RTSP_ENOMEM. + * + * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits + * are reached. * * Since: 0.10.25 */ @@ -3499,6 +3564,12 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, } } + /* check limits */ + if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) || + (watch->max_messages != 0 + && watch->messages->length >= watch->max_messages)) + goto too_much_backlog; + /* make a record with the data and id for sending async */ rec = g_slice_new (GstRTSPRec); if (off == 0) { @@ -3515,8 +3586,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, rec->id = ++watch->id; } while (G_UNLIKELY (rec->id == 0)); - /* add the record to a queue. FIXME we would like to have an upper limit here */ + /* add the record to a queue. */ g_queue_push_head (watch->messages, rec); + watch->messages_bytes += rec->size; /* make sure the main context will now also check for writability on the * socket */ @@ -3536,6 +3608,17 @@ done: g_main_context_wakeup (context); return res; + + /* ERRORS */ +too_much_backlog: + { + GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %" + G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes, + watch->messages_bytes, watch->max_messages, watch->messages->length); + g_mutex_unlock (watch->mutex); + g_free ((gpointer) data); + return GST_RTSP_ENOMEM; + } } /** diff --git a/gst-libs/gst/rtsp/gstrtspconnection.h b/gst-libs/gst/rtsp/gstrtspconnection.h index 2a34abd0f..2ecd32564 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.h +++ b/gst-libs/gst/rtsp/gstrtspconnection.h @@ -189,6 +189,11 @@ void gst_rtsp_watch_unref (GstRTSPWatch *watch); guint gst_rtsp_watch_attach (GstRTSPWatch *watch, GMainContext *context); +void gst_rtsp_watch_set_send_backlog (GstRTSPWatch *watch, + gsize bytes, guint messages); +void gst_rtsp_watch_get_send_backlog (GstRTSPWatch *watch, + gsize *bytes, guint *messages); + GstRTSPResult gst_rtsp_watch_write_data (GstRTSPWatch *watch, const guint8 *data, guint size, guint *id); |