diff options
author | Wim Taymans <wtaymans@redhat.com> | 2020-04-07 17:58:43 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2020-04-07 17:58:43 +0200 |
commit | b18dacde9aee94f5e784a2465363f662c9683135 (patch) | |
tree | a8260b3dd23c23567013d6e32e2ec33f82371d8a | |
parent | 029f43141851c68604bc573b2474545e31a3326b (diff) |
spa: improve draining
Make a new DRAINED status.
Place the DRAINED status on an input IO when a stream is out of
buffers and draining.
All nodes that don't have HAVE_DATA on the input io need to copy
it to the output io and return the status. This makes sure the
DRAINED is forwarded and nodes return DRAINED from _process()
DRAINED on the resampler flushes out the last queued samples and then
forwards the DRAINED in the next iteration.
Emit a new drained signal from the context when a node returns
DRAINED. Use this to trigger the drained signal in the stream.
-rw-r--r-- | spa/include/spa/node/io.h | 4 | ||||
-rw-r--r-- | spa/plugins/audioconvert/audioadapter.c | 17 | ||||
-rw-r--r-- | spa/plugins/audioconvert/audioconvert.c | 2 | ||||
-rw-r--r-- | spa/plugins/audioconvert/channelmix.c | 6 | ||||
-rw-r--r-- | spa/plugins/audioconvert/fmtconvert.c | 3 | ||||
-rw-r--r-- | spa/plugins/audioconvert/resample.c | 22 | ||||
-rw-r--r-- | spa/plugins/audioconvert/splitter.c | 2 | ||||
-rw-r--r-- | src/pipewire/impl-node.c | 3 | ||||
-rw-r--r-- | src/pipewire/private.h | 3 | ||||
-rw-r--r-- | src/pipewire/stream.c | 26 |
10 files changed, 46 insertions, 42 deletions
diff --git a/spa/include/spa/node/io.h b/spa/include/spa/node/io.h index bc780f88..eebc62a9 100644 --- a/spa/include/spa/node/io.h +++ b/spa/include/spa/node/io.h @@ -77,6 +77,9 @@ enum spa_io_type { * If status is SPA_STATUS_STOPPED, some error occured on the * port. * + * If status is SPA_STATUS_DRAINED, data from the io area was + * used to drain. + * * Status can also be a negative errno value to indicate errors. * such as: * -EINVAL: buffer_id is invalid @@ -87,6 +90,7 @@ struct spa_io_buffers { #define SPA_STATUS_NEED_DATA (1<<0) #define SPA_STATUS_HAVE_DATA (1<<1) #define SPA_STATUS_STOPPED (1<<2) +#define SPA_STATUS_DRAINED (1<<3) int32_t status; /**< the status code */ uint32_t buffer_id; /**< a buffer id */ }; diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c index 21f61068..851cd389 100644 --- a/spa/plugins/audioconvert/audioadapter.c +++ b/spa/plugins/audioconvert/audioadapter.c @@ -865,7 +865,7 @@ impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) static int impl_node_process(void *object) { struct impl *this = object; - int status; + int status = 0; spa_log_trace_fp(this->log, "%p: process convert:%u master:%d", this, this->use_converter, this->master); @@ -875,22 +875,17 @@ static int impl_node_process(void *object) status = spa_node_process(this->convert); } - status = spa_node_process(this->follower); + if (status >= 0) + status = spa_node_process(this->follower); if (this->direction == SPA_DIRECTION_OUTPUT && !this->master && this->use_converter) { - while (true) { + while (status >= 0) { status = spa_node_process(this->convert); - if (status & SPA_STATUS_HAVE_DATA) + if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) break; - - if (status & SPA_STATUS_NEED_DATA) { + if (status & SPA_STATUS_NEED_DATA) status = spa_node_process(this->follower); - if (!(status & SPA_STATUS_HAVE_DATA)) { - spa_node_call_xrun(&this->callbacks, 0, 0, NULL); - break; - } - } } } spa_log_trace_fp(this->log, "%p: process status:%d", this, status); diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index c308ab7e..08a6c03e 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -1084,7 +1084,7 @@ static int impl_node_process(void *object) if (SPA_UNLIKELY(i == 0)) res |= r & SPA_STATUS_NEED_DATA; if (SPA_UNLIKELY(i == this->n_nodes-1)) - res |= r & SPA_STATUS_HAVE_DATA; + res |= r & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED); } if (res & SPA_STATUS_HAVE_DATA) break; diff --git a/spa/plugins/audioconvert/channelmix.c b/spa/plugins/audioconvert/channelmix.c index 651928a0..2132af00 100644 --- a/spa/plugins/audioconvert/channelmix.c +++ b/spa/plugins/audioconvert/channelmix.c @@ -864,15 +864,13 @@ static int impl_node_process(void *object) if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA)) return SPA_STATUS_HAVE_DATA; - - if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; - /* recycle */ if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) { recycle_buffer(this, outio->buffer_id); outio->buffer_id = SPA_ID_INVALID; } + if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) + return outio->status = inio->status; if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; diff --git a/spa/plugins/audioconvert/fmtconvert.c b/spa/plugins/audioconvert/fmtconvert.c index 4fc3e0c6..5c58b8df 100644 --- a/spa/plugins/audioconvert/fmtconvert.c +++ b/spa/plugins/audioconvert/fmtconvert.c @@ -844,7 +844,8 @@ static int impl_node_process(void *object) outio->buffer_id = SPA_ID_INVALID; } if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; + return outio->status = inio->status; + if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; diff --git a/spa/plugins/audioconvert/resample.c b/spa/plugins/audioconvert/resample.c index 68aed53b..dd615fde 100644 --- a/spa/plugins/audioconvert/resample.c +++ b/spa/plugins/audioconvert/resample.c @@ -118,6 +118,7 @@ struct impl { int mode; unsigned int started:1; unsigned int peaks:1; + unsigned int drained:1; struct resample resample; }; @@ -723,6 +724,7 @@ static int impl_node_process(void *object) void **dst_datas; bool flush_out = false; bool flush_in = false; + bool draining = false; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -741,15 +743,18 @@ static int impl_node_process(void *object) if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA)) return SPA_STATUS_HAVE_DATA; - - if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; - /* recycle */ if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) { recycle_buffer(this, outio->buffer_id); outio->buffer_id = SPA_ID_INVALID; } + if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) { + if (inio->status != SPA_STATUS_DRAINED || this->drained) + return outio->status = inio->status; + + inio->buffer_id = 0; + inport->buffers[0].outbuf->datas[0].chunk->size = 0; + } if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; @@ -784,7 +789,7 @@ static int impl_node_process(void *object) size = sb->datas[0].maxsize; memset(sb->datas[0].data, 0, size); inport->offset = 0; - flush_in = true; + flush_in = draining = true; } if (this->io_rate_match) { @@ -829,18 +834,19 @@ static int impl_node_process(void *object) if (inport->offset >= size || flush_in) { inio->status = SPA_STATUS_NEED_DATA; inport->offset = 0; - SPA_FLAG_SET(res, SPA_STATUS_NEED_DATA); - spa_log_trace_fp(this->log, NAME " %p: return input buffer", this); + SPA_FLAG_SET(res, inio->status); + spa_log_trace_fp(this->log, NAME " %p: return input buffer of size %d", this, size); } outport->offset += out_len * sizeof(float); if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) { outio->status = SPA_STATUS_HAVE_DATA; outio->buffer_id = dbuf->id; + spa_log_trace_fp(this->log, NAME " %p: have output buffer of size %d", this, outport->offset); dequeue_buffer(this, dbuf); outport->offset = 0; + this->drained = draining; SPA_FLAG_SET(res, SPA_STATUS_HAVE_DATA); - spa_log_trace_fp(this->log, NAME " %p: have output buffer", this); } if (out_len == 0 && this->peaks) { outio->status = SPA_STATUS_HAVE_DATA; diff --git a/spa/plugins/audioconvert/splitter.c b/spa/plugins/audioconvert/splitter.c index 48dc1c3c..6877df8f 100644 --- a/spa/plugins/audioconvert/splitter.c +++ b/spa/plugins/audioconvert/splitter.c @@ -860,7 +860,7 @@ static int impl_node_process(void *object) inio, inio->status, inio->buffer_id); if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; + return inio->status; if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 450eb87f..13473850 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -906,6 +906,9 @@ static inline int process_node(void *data) } else { resume_node(this, status); } + if (status & SPA_STATUS_DRAINED) { + pw_context_driver_emit_drained(this->context, this); + } return 0; } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 795614ca..a59698b2 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -235,6 +235,7 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq, #define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n) #define pw_context_driver_emit_incomplete(c,n) pw_context_driver_emit(c, incomplete, 0, n) #define pw_context_driver_emit_timeout(c,n) pw_context_driver_emit(c, timeout, 0, n) +#define pw_context_driver_emit_drained(c,n) pw_context_driver_emit(c, drained, 0, n) struct pw_context_driver_events { #define PW_VERSION_CONTEXT_DRIVER_EVENTS 0 @@ -248,6 +249,8 @@ struct pw_context_driver_events { void (*incomplete) (void *data, struct pw_impl_node *node); /** The driver got a sync timeout */ void (*timeout) (void *data, struct pw_impl_node *node); + /** a node drained */ + void (*drained) (void *data, struct pw_impl_node *node); }; #define pw_registry_resource(r,m,v,...) pw_resource_call(r, struct pw_registry_events,m,v,##__VA_ARGS__) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index d1b41e19..ec848243 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -331,7 +331,6 @@ do_call_drained(struct spa_loop *loop, struct pw_stream *stream = &impl->this; pw_log_trace(NAME" %p: drained", stream); pw_stream_emit_drained(stream); - impl->draining = false; return 0; } @@ -765,8 +764,7 @@ again: pw_log_trace(NAME" %p: process out status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64, stream, io->status, io->buffer_id, impl->time.ticks, impl->time.delay); - res = 0; - if (io->status != SPA_STATUS_HAVE_DATA) { + if ((res = io->status) != SPA_STATUS_HAVE_DATA) { /* recycle old buffer */ if ((b = get_buffer(stream, io->buffer_id)) != NULL) { pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id); @@ -776,20 +774,17 @@ again: /* pop new buffer */ if ((b = pop_queue(impl, &impl->queued)) != NULL) { io->buffer_id = b->id; - io->status = SPA_STATUS_HAVE_DATA; + res = io->status = SPA_STATUS_HAVE_DATA; pw_log_trace(NAME" %p: pop %d %p", stream, b->id, io); + } else if (impl->draining) { + impl->drained = true; + io->buffer_id = SPA_ID_INVALID; + res = io->status = SPA_STATUS_DRAINED; + pw_log_trace(NAME" %p: draining", stream); } else { io->buffer_id = SPA_ID_INVALID; - io->status = SPA_STATUS_NEED_DATA; + res = io->status = SPA_STATUS_NEED_DATA; pw_log_trace(NAME" %p: no more buffers %p", stream, io); - if (impl->draining && !impl->drained) { - b = pop_queue(impl, &impl->dequeued); - io->buffer_id = b->id; - io->status = SPA_STATUS_HAVE_DATA; - b->this.buffer->datas[0].chunk->size = 0; - pw_log_trace(NAME" %p: drain buffer %d", stream, b->id); - impl->drained = true; - } } } @@ -803,7 +798,6 @@ again: } copy_position(impl, impl->queued.outcount); - res = io->status; pw_log_trace(NAME" %p: res %d", stream, res); return res; @@ -1044,7 +1038,7 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; -static void context_xrun(void *data, struct pw_impl_node *node) +static void context_drained(void *data, struct pw_impl_node *node) { struct stream *impl = data; if (impl->node != node) @@ -1055,7 +1049,7 @@ static void context_xrun(void *data, struct pw_impl_node *node) static const struct pw_context_driver_events context_events = { PW_VERSION_CONTEXT_DRIVER_EVENTS, - .xrun = context_xrun, + .drained = context_drained, }; static struct stream * |