From 076da98efb18fd7df3020baed7ead7f22ef0f219 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 16 Aug 2007 11:40:16 +0000 Subject: gst/rtpmanager/gstrtpjitterbuffer.c: Fix EOS handling. Original commit message from CVS: * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain), (gst_rtp_jitter_buffer_loop): Fix EOS handling. Convert some DEBUG into WARNINGs. Pause task when flushing. * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), (rtcp_thread), (gst_rtp_session_event_recv_rtcp_sink): Use system clock for RTCP session management timeouts. * gst/rtpmanager/rtpsession.c: (on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout): Release the session lock when emiting signals. --- gst/rtpmanager/gstrtpjitterbuffer.c | 33 +++++++++++++++++++++------------ gst/rtpmanager/gstrtpsession.c | 18 ++++++++++++++++-- gst/rtpmanager/rtpsession.c | 12 ++++++++++++ 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index fe85f87f3..e66613bbf 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -696,16 +696,18 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event) /* check for flushing, we need to discard the event and return FALSE when * we are flushing */ ret = priv->srcresult == GST_FLOW_OK; - if (ret) { + if (ret && !priv->eos) { GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS"); priv->eos = TRUE; JBUF_SIGNAL (priv); + } else if (priv->eos) { + GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS"); } else { GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s", gst_flow_get_name (priv->srcresult)); - gst_event_unref (event); } JBUF_UNLOCK (priv); + gst_event_unref (event); break; } default: @@ -863,7 +865,7 @@ invalid_buffer: } not_negotiated: { - GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!"); + GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!"); gst_buffer_unref (buffer); gst_object_unref (jitterbuffer); return GST_FLOW_NOT_NEGOTIATED; @@ -878,13 +880,13 @@ out_flushing: have_eos: { ret = GST_FLOW_UNEXPECTED; - GST_DEBUG_OBJECT (jitterbuffer, "we are EOS, refusing buffer"); + GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer"); gst_buffer_unref (buffer); goto finished; } too_late: { - GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already" + GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already" " popped, dropping", seqnum, priv->last_popped_seqnum); priv->num_late++; gst_buffer_unref (buffer); @@ -892,7 +894,7 @@ too_late: } duplicate: { - GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", + GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", seqnum); priv->num_duplicates++; gst_buffer_unref (buffer); @@ -923,13 +925,19 @@ gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer) JBUF_LOCK_CHECK (priv, flushing); again: GST_DEBUG_OBJECT (jitterbuffer, "Popping item"); - /* wait if we are blocked or don't have a packet and eos */ - while (priv->blocked || !(rtp_jitter_buffer_num_packets (priv->jbuf) - || priv->eos)) { + while (TRUE) { + + /* always wait if we are blocked */ + if (!priv->blocked) { + /* if we have a packet, we can grab it */ + if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0) + break; + /* no packets but we are EOS, do eos logic */ + if (priv->eos) + goto do_eos; + } JBUF_WAIT_CHECK (priv, flushing); } - if (priv->eos) - goto do_eos; /* pop a buffer, we must have a buffer now */ outbuf = rtp_jitter_buffer_pop (priv->jbuf); @@ -955,7 +963,7 @@ again: if (priv->next_seqnum != -1) { /* we expected next_seqnum but received something else, that's a gap */ - GST_DEBUG_OBJECT (jitterbuffer, + GST_WARNING_OBJECT (jitterbuffer, "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum, seqnum); } else { @@ -1092,6 +1100,7 @@ do_eos: flushing: { GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); + gst_pad_pause_task (priv->srcpad); if (outbuf) gst_buffer_unref (outbuf); JBUF_UNLOCK (priv); diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index bb47a29ed..01153e23c 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -537,9 +537,10 @@ rtcp_thread (GstRTPSession * rtpsession) GstClockTime current_time; GstClockTime next_timeout; - clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); + /* RTCP timeouts we use the system clock */ + clock = gst_system_clock_obtain (); if (clock == NULL) - return; + goto no_clock; current_time = gst_clock_get_time (clock); @@ -590,6 +591,15 @@ rtcp_thread (GstRTPSession * rtpsession) gst_object_unref (clock); GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); + return; + + /* ERRORS */ +no_clock: + { + GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL), + ("Could not get system clock")); + return; + } } static gboolean @@ -900,6 +910,10 @@ gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { default: + if (rtpsession->send_rtcp_src) { + gst_event_ref (event); + ret = gst_pad_push_event (rtpsession->send_rtcp_src, event); + } ret = gst_pad_push_event (rtpsession->sync_src, event); break; } diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 2b3bcb829..9ab3b4a05 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -251,39 +251,51 @@ rtp_session_get_property (GObject * object, guint prop_id, static void on_new_ssrc (RTPSession * sess, RTPSource * source) { + RTP_SESSION_UNLOCK (sess); g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source); + RTP_SESSION_LOCK (sess); } static void on_ssrc_collision (RTPSession * sess, RTPSource * source) { + RTP_SESSION_UNLOCK (sess); g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0, source); + RTP_SESSION_LOCK (sess); } static void on_ssrc_validated (RTPSession * sess, RTPSource * source) { + RTP_SESSION_UNLOCK (sess); g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0, source); + RTP_SESSION_LOCK (sess); } static void on_bye_ssrc (RTPSession * sess, RTPSource * source) { + RTP_SESSION_UNLOCK (sess); g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source); + RTP_SESSION_LOCK (sess); } static void on_bye_timeout (RTPSession * sess, RTPSource * source) { + RTP_SESSION_UNLOCK (sess); g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source); + RTP_SESSION_LOCK (sess); } static void on_timeout (RTPSession * sess, RTPSource * source) { + RTP_SESSION_UNLOCK (sess); g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source); + RTP_SESSION_LOCK (sess); } /** -- cgit v1.2.3