summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas Dufresne <nicolas.dufresne@collabora.com>2020-03-27 15:48:32 -0400
committerNicolas Dufresne <nicolas.dufresne@collabora.com>2020-06-08 17:54:53 -0400
commitb4f421e9aa880ddfa1804d626d9be1baf432ade8 (patch)
treebc18f12fc8e383a56bb2c6fcdc21a68e078d9a2a
parent0594d2f981cd714bc10bb35ebb39fbc09ebb1a95 (diff)
rtpjitterbuffer: Keep JBUF lock while processing timers
Until now, do_expected_timeout() was shortly dropping the JBUF_LOCK in order to push RTX event event without causing deadlock. As a side effect, some CPU hung would happen as the timerqueue would get filled while looping over the due timers. To mitigate this, we were processing the lost timer first and placing into a queue the remainign to be processed later. In the gap caused by an unlock, we could endup receiving one of the seqnum present in the pending timers. In that case, the timer would not be found and a new one was created. When we then update the expected timer, the seqnum would already exist and the updated timer would be lost. In this patch we remove the unlock from do_expected_timeout() and place all pending RTX event into a queue (instead of pending timer). Then, as soon as we have selected a timer to wait (or if there is no timer to wait for) we send all the upstream RTX events. As we no longer unlock, we no longer need to pop more then one timer from the queue, and we do so with the lock held, which blocks any new colliding timers from being created. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/616>
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c69
1 files changed, 42 insertions, 27 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index 03f2c7fd4f..1d698ba9c9 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -3774,7 +3774,7 @@ update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer,
/* the timeout for when we expected a packet expired */
static gboolean
do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
- GstClockTime now)
+ GstClockTime now, GQueue * events)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstEvent *event;
@@ -3812,6 +3812,7 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
"deadline", G_TYPE_UINT, rtx_deadline_ms,
"packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
"avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
+ g_queue_push_tail (events, event);
GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
priv->num_rtx_requests++;
@@ -3849,10 +3850,6 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
timer->rtx_base + timer->rtx_retry, timer->rtx_delay, offset, FALSE);
- JBUF_UNLOCK (priv);
- gst_pad_push_event (priv->sinkpad, event);
- JBUF_LOCK (priv);
-
return FALSE;
}
@@ -3922,13 +3919,13 @@ do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
static gboolean
do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
- GstClockTime now)
+ GstClockTime now, GQueue * events)
{
gboolean removed = FALSE;
switch (timer->type) {
case RTP_TIMER_EXPECTED:
- removed = do_expected_timeout (jitterbuffer, timer, now);
+ removed = do_expected_timeout (jitterbuffer, timer, now, events);
break;
case RTP_TIMER_LOST:
removed = do_lost_timeout (jitterbuffer, timer, now);
@@ -3943,6 +3940,35 @@ do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
return removed;
}
+static void
+push_rtx_events_unlocked (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstEvent *event;
+
+ while ((event = (GstEvent *) g_queue_pop_head (events)))
+ gst_pad_push_event (priv->sinkpad, event);
+}
+
+/* called with JBUF lock
+ *
+ * Pushes all events in @events queue.
+ *
+ * Returns: %TRUE if the timer thread is not longer running
+ */
+static void
+push_rtx_events (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+ if (events->length == 0)
+ return;
+
+ JBUF_UNLOCK (priv);
+ push_rtx_events_unlocked (jitterbuffer, events);
+ JBUF_LOCK (priv);
+}
+
/* called when we need to wait for the next timeout.
*
* We loop over the array of recorded timeouts and wait for the earliest one.
@@ -3959,7 +3985,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK (priv);
while (priv->timer_running) {
RtpTimer *timer = NULL;
- GQueue timers = G_QUEUE_INIT;
+ GQueue events = G_QUEUE_INIT;
/* don't produce data in paused */
while (priv->blocked) {
@@ -3992,25 +4018,8 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
/* Iterate expired "normal" timers */
- while ((timer = rtp_timer_queue_pop_until (priv->timers, now))) {
- do {
- if (timer->type == RTP_TIMER_LOST) {
- GST_DEBUG_OBJECT (jitterbuffer, "Weeding out expired lost timers");
- do_lost_timeout (jitterbuffer, timer, now);
- } else {
- g_queue_push_tail_link (&timers, (GList *) timer);
- }
- } while ((timer = rtp_timer_queue_pop_until (priv->timers, now)));
-
- /* execute the remaining timers */
- while ((timer = (RtpTimer *) g_queue_pop_head_link (&timers)))
- do_timeout (jitterbuffer, timer, now);
-
- /* do_expected_timeout(), called by do_timeout will drop the
- * JBUF_LOCK, so we need to check if we are still running */
- if (!priv->timer_running)
- goto stopping;
- }
+ while ((timer = rtp_timer_queue_pop_until (priv->timers, now)))
+ do_timeout (jitterbuffer, timer, now, &events);
timer = rtp_timer_queue_peek_earliest (priv->timers);
if (timer) {
@@ -4031,6 +4040,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
/* let's just push if there is no clock */
GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
now = timer->timeout;
+ push_rtx_events (jitterbuffer, &events);
continue;
}
@@ -4052,11 +4062,14 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
/* release the lock so that the other end can push stuff or unlock */
JBUF_UNLOCK (priv);
+ push_rtx_events_unlocked (jitterbuffer, &events);
+
ret = gst_clock_id_wait (id, &clock_jitter);
JBUF_LOCK (priv);
if (!priv->timer_running) {
+ g_queue_clear_full (&events, (GDestroyNotify) gst_event_unref);
gst_clock_id_unref (id);
priv->clock_id = NULL;
break;
@@ -4075,6 +4088,8 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
gst_clock_id_unref (id);
priv->clock_id = NULL;
} else {
+ push_rtx_events_unlocked (jitterbuffer, &events);
+
/* when draining the timers, the pusher thread will reuse our
* condition to wait for completion. Signal that thread before
* sleeping again here */