diff options
author | Matthew Waters <matthew@centricular.com> | 2014-10-06 18:23:03 +1100 |
---|---|---|
committer | Matthew Waters <matthew@centricular.com> | 2014-10-09 23:52:11 +1100 |
commit | a41bc98b6e7436e5bb4a465b55674f47a98a6f5f (patch) | |
tree | 90ce14a96c74b46f1f2e00644e1afaae05447fd6 | |
parent | c23cd9c3be90e960f3c487640bb75f3cda2b93b0 (diff) |
aggregator: add a timeout property determining buffer wait time
Determines the amount of time that a pad will wait for a buffer before
being marked unresponsive.
Network sources may fail to produce buffers for an extended period of time,
currently causing the pipeline to stall possibly indefinitely, waiting for
these buffers to appear.
Subclasses should render unresponsive pads with either silence (audio), the
last (video) frame or what makes the most sense in the given context.
-rw-r--r-- | gst-libs/gst/base/gstaggregator.c | 177 | ||||
-rw-r--r-- | gst-libs/gst/base/gstaggregator.h | 6 |
2 files changed, 178 insertions, 5 deletions
diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c index dce718073..42c0d8e3b 100644 --- a/gst-libs/gst/base/gstaggregator.c +++ b/gst-libs/gst/base/gstaggregator.c @@ -69,6 +69,9 @@ /* Might become API */ static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout); +static gint64 gst_aggregator_get_timeout (GstAggregator * agg); + GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -145,6 +148,8 @@ struct _GstAggregatorPadPrivate gboolean pending_eos; gboolean flushing; + GstClockID timeout_id; + GMutex event_lock; GCond event_cond; @@ -232,6 +237,15 @@ typedef struct gboolean one_actually_seeked; } EventData; +#define DEFAULT_TIMEOUT -1 + +enum +{ + PROP_0, + PROP_TIMEOUT, + PROP_LAST +}; + /** * gst_aggregator_iterate_sinkpads: * @self: The #GstAggregator @@ -311,13 +325,18 @@ no_iter: } static inline gboolean -_check_all_pads_with_data_or_eos (GstAggregator * self, +_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self, GstAggregatorPad * aggpad) { if (aggpad->buffer || aggpad->eos) { return TRUE; } + if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) { + /* pad has been deemed unresponsive */ + return TRUE; + } + GST_LOG_OBJECT (aggpad, "Not ready to be aggregated"); return FALSE; @@ -453,8 +472,8 @@ aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && gst_aggregator_iterate_sinkpads (self, - (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, - NULL) && priv->running) { + (GstAggregatorPadForeachFunc) + _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) { GST_TRACE_OBJECT (self, "Actually aggregating!"); priv->flow_return = klass->aggregate (self); @@ -714,10 +733,19 @@ eat: } static gboolean -_flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata) +_stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata) { _aggpad_flush (pad, self); + PAD_LOCK_EVENT (pad); + /* remove the timeouts */ + if (pad->priv->timeout_id) { + gst_clock_id_unschedule (pad->priv->timeout_id); + gst_clock_id_unref (pad->priv->timeout_id); + pad->priv->timeout_id = NULL; + } + PAD_UNLOCK_EVENT (pad); + return TRUE; } @@ -727,7 +755,7 @@ _stop (GstAggregator * agg) _reset_flow_values (agg); gst_aggregator_iterate_sinkpads (agg, - (GstAggregatorPadForeachFunc) _flush_pad, NULL); + (GstAggregatorPadForeachFunc) _stop_pad, NULL); if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); @@ -795,6 +823,37 @@ _release_pad (GstElement * element, GstPad * pad) QUEUE_PUSH (self); } +static gboolean +_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id, + gpointer user_data) +{ + GstAggregatorPad *aggpad; + GstAggregator *self; + + if (user_data == NULL) + return FALSE; + + aggpad = GST_AGGREGATOR_PAD (user_data); + + /* avoid holding the last reference to the parent element here */ + PAD_LOCK_EVENT (aggpad); + + self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad))); + + GST_DEBUG_OBJECT (aggpad, "marked unresponsive"); + + g_atomic_int_set (&aggpad->unresponsive, TRUE); + + if (self) { + QUEUE_PUSH (self); + gst_object_unref (self); + } + + PAD_UNLOCK_EVENT (aggpad); + + return TRUE; +} + static GstPad * _request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) @@ -827,6 +886,7 @@ _request_new_pad (GstElement * element, agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); + GST_OBJECT_UNLOCK (element); } else { @@ -1127,6 +1187,7 @@ gst_aggregator_finalize (GObject * object) { GstAggregator *self = (GstAggregator *) object; + gst_object_unref (self->clock); g_mutex_clear (&self->priv->setcaps_lock); G_OBJECT_CLASS (aggregator_parent_class)->finalize (object); @@ -1145,6 +1206,82 @@ gst_aggregator_dispose (GObject * object) } } +/** + * gst_aggregator_set_timeout: + * @agg: a #GstAggregator + * @timeout: the new timeout value. + * + * Sets the new timeout value to @timeout. This value is used to limit the + * amount of time a pad waits for data to appear before considering the pad + * as unresponsive. + */ +static void +gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout) +{ + g_return_if_fail (GST_IS_AGGREGATOR (agg)); + + GST_OBJECT_LOCK (agg); + agg->timeout = timeout; + GST_OBJECT_UNLOCK (agg); +} + +/** + * gst_aggregator_get_timeout: + * @agg: a #GstAggregator + * + * Gets the timeout value. See gst_aggregator_set_timeout for + * more details. + * + * Returns: The time in nanoseconds to wait for data to arrive on a sink pad + * before a pad is deemed unresponsive. A value of -1 means an + * unlimited time. + */ +static gint64 +gst_aggregator_get_timeout (GstAggregator * agg) +{ + gint64 res; + + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + + GST_OBJECT_LOCK (agg); + res = agg->timeout; + GST_OBJECT_UNLOCK (agg); + + return res; +} + +static void +gst_aggregator_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_TIMEOUT: + gst_aggregator_set_timeout (agg, g_value_get_int64 (value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + switch (prop_id) { + case PROP_TIMEOUT: + g_value_set_int64 (value, gst_aggregator_get_timeout (agg)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + /* GObject vmethods implementations */ static void gst_aggregator_class_init (GstAggregatorClass * klass) @@ -1173,8 +1310,17 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad); gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state); + gobject_class->set_property = gst_aggregator_set_property; + gobject_class->get_property = gst_aggregator_get_property; gobject_class->finalize = gst_aggregator_finalize; gobject_class->dispose = gst_aggregator_dispose; + + g_object_class_install_property (gobject_class, PROP_TIMEOUT, + g_param_spec_int64 ("timeout", "Buffer timeout", + "Number of nanoseconds to wait for a buffer to arrive on a sink pad" + "before the pad is deemed unresponsive (-1 unlimited)", -1, + G_MAXINT64, DEFAULT_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void @@ -1211,6 +1357,9 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); + self->clock = gst_system_clock_obtain (); + self->timeout = -1; + g_mutex_init (&self->priv->setcaps_lock); } @@ -1250,10 +1399,19 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) GstAggregatorPrivate *priv = self->priv; GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); + GstClockTime timeout = gst_aggregator_get_timeout (self); + GstClockTime now; GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); + if (aggpad->priv->timeout_id) { + gst_clock_id_unschedule (aggpad->priv->timeout_id); + gst_clock_id_unref (aggpad->priv->timeout_id); + aggpad->priv->timeout_id = NULL; + } + g_atomic_int_set (&aggpad->unresponsive, FALSE); PAD_STREAM_LOCK (aggpad); + if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) goto flushing; @@ -1261,6 +1419,7 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) goto eos; PAD_LOCK_EVENT (aggpad); + if (aggpad->buffer) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); @@ -1283,6 +1442,14 @@ _chain (GstPad * pad, GstObject * object, GstBuffer * buffer) QUEUE_PUSH (self); + if (GST_CLOCK_TIME_IS_VALID (timeout)) { + now = gst_clock_get_time (self->clock); + aggpad->priv->timeout_id = + gst_clock_new_single_shot_id (self->clock, now + timeout); + gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout, + gst_object_ref (aggpad), gst_object_unref); + } + GST_DEBUG_OBJECT (aggpad, "Done chaining"); return priv->flow_return; diff --git a/gst-libs/gst/base/gstaggregator.h b/gst-libs/gst/base/gstaggregator.h index 507da136c..837fdbb34 100644 --- a/gst-libs/gst/base/gstaggregator.h +++ b/gst-libs/gst/base/gstaggregator.h @@ -73,6 +73,7 @@ struct _GstAggregatorPad GstBuffer * buffer; GstSegment segment; gboolean eos; + gboolean unresponsive; /* < Private > */ GstAggregatorPadPrivate * priv; @@ -137,6 +138,11 @@ struct _GstAggregator /*< private >*/ GstAggregatorPrivate * priv; + GstClock * clock; + + /* properties */ + gint64 timeout; + gpointer _gst_reserved[GST_PADDING]; }; |