summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2020-04-07 17:58:43 +0200
committerWim Taymans <wtaymans@redhat.com>2020-04-07 17:58:43 +0200
commitb18dacde9aee94f5e784a2465363f662c9683135 (patch)
treea8260b3dd23c23567013d6e32e2ec33f82371d8a
parent029f43141851c68604bc573b2474545e31a3326b (diff)
spa: improve draining
Make a new DRAINED status. Place the DRAINED status on an input IO when a stream is out of buffers and draining. All nodes that don't have HAVE_DATA on the input io need to copy it to the output io and return the status. This makes sure the DRAINED is forwarded and nodes return DRAINED from _process() DRAINED on the resampler flushes out the last queued samples and then forwards the DRAINED in the next iteration. Emit a new drained signal from the context when a node returns DRAINED. Use this to trigger the drained signal in the stream.
-rw-r--r--spa/include/spa/node/io.h4
-rw-r--r--spa/plugins/audioconvert/audioadapter.c17
-rw-r--r--spa/plugins/audioconvert/audioconvert.c2
-rw-r--r--spa/plugins/audioconvert/channelmix.c6
-rw-r--r--spa/plugins/audioconvert/fmtconvert.c3
-rw-r--r--spa/plugins/audioconvert/resample.c22
-rw-r--r--spa/plugins/audioconvert/splitter.c2
-rw-r--r--src/pipewire/impl-node.c3
-rw-r--r--src/pipewire/private.h3
-rw-r--r--src/pipewire/stream.c26
10 files changed, 46 insertions, 42 deletions
diff --git a/spa/include/spa/node/io.h b/spa/include/spa/node/io.h
index bc780f88..eebc62a9 100644
--- a/spa/include/spa/node/io.h
+++ b/spa/include/spa/node/io.h
@@ -77,6 +77,9 @@ enum spa_io_type {
* If status is SPA_STATUS_STOPPED, some error occured on the
* port.
*
+ * If status is SPA_STATUS_DRAINED, data from the io area was
+ * used to drain.
+ *
* Status can also be a negative errno value to indicate errors.
* such as:
* -EINVAL: buffer_id is invalid
@@ -87,6 +90,7 @@ struct spa_io_buffers {
#define SPA_STATUS_NEED_DATA (1<<0)
#define SPA_STATUS_HAVE_DATA (1<<1)
#define SPA_STATUS_STOPPED (1<<2)
+#define SPA_STATUS_DRAINED (1<<3)
int32_t status; /**< the status code */
uint32_t buffer_id; /**< a buffer id */
};
diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c
index 21f61068..851cd389 100644
--- a/spa/plugins/audioconvert/audioadapter.c
+++ b/spa/plugins/audioconvert/audioadapter.c
@@ -865,7 +865,7 @@ impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
static int impl_node_process(void *object)
{
struct impl *this = object;
- int status;
+ int status = 0;
spa_log_trace_fp(this->log, "%p: process convert:%u master:%d",
this, this->use_converter, this->master);
@@ -875,22 +875,17 @@ static int impl_node_process(void *object)
status = spa_node_process(this->convert);
}
- status = spa_node_process(this->follower);
+ if (status >= 0)
+ status = spa_node_process(this->follower);
if (this->direction == SPA_DIRECTION_OUTPUT &&
!this->master && this->use_converter) {
- while (true) {
+ while (status >= 0) {
status = spa_node_process(this->convert);
- if (status & SPA_STATUS_HAVE_DATA)
+ if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED))
break;
-
- if (status & SPA_STATUS_NEED_DATA) {
+ if (status & SPA_STATUS_NEED_DATA)
status = spa_node_process(this->follower);
- if (!(status & SPA_STATUS_HAVE_DATA)) {
- spa_node_call_xrun(&this->callbacks, 0, 0, NULL);
- break;
- }
- }
}
}
spa_log_trace_fp(this->log, "%p: process status:%d", this, status);
diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c
index c308ab7e..08a6c03e 100644
--- a/spa/plugins/audioconvert/audioconvert.c
+++ b/spa/plugins/audioconvert/audioconvert.c
@@ -1084,7 +1084,7 @@ static int impl_node_process(void *object)
if (SPA_UNLIKELY(i == 0))
res |= r & SPA_STATUS_NEED_DATA;
if (SPA_UNLIKELY(i == this->n_nodes-1))
- res |= r & SPA_STATUS_HAVE_DATA;
+ res |= r & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED);
}
if (res & SPA_STATUS_HAVE_DATA)
break;
diff --git a/spa/plugins/audioconvert/channelmix.c b/spa/plugins/audioconvert/channelmix.c
index 651928a0..2132af00 100644
--- a/spa/plugins/audioconvert/channelmix.c
+++ b/spa/plugins/audioconvert/channelmix.c
@@ -864,15 +864,13 @@ static int impl_node_process(void *object)
if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA))
return SPA_STATUS_HAVE_DATA;
-
- if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
- return SPA_STATUS_NEED_DATA;
-
/* recycle */
if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) {
recycle_buffer(this, outio->buffer_id);
outio->buffer_id = SPA_ID_INVALID;
}
+ if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
+ return outio->status = inio->status;
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL;
diff --git a/spa/plugins/audioconvert/fmtconvert.c b/spa/plugins/audioconvert/fmtconvert.c
index 4fc3e0c6..5c58b8df 100644
--- a/spa/plugins/audioconvert/fmtconvert.c
+++ b/spa/plugins/audioconvert/fmtconvert.c
@@ -844,7 +844,8 @@ static int impl_node_process(void *object)
outio->buffer_id = SPA_ID_INVALID;
}
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
- return SPA_STATUS_NEED_DATA;
+ return outio->status = inio->status;
+
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL;
diff --git a/spa/plugins/audioconvert/resample.c b/spa/plugins/audioconvert/resample.c
index 68aed53b..dd615fde 100644
--- a/spa/plugins/audioconvert/resample.c
+++ b/spa/plugins/audioconvert/resample.c
@@ -118,6 +118,7 @@ struct impl {
int mode;
unsigned int started:1;
unsigned int peaks:1;
+ unsigned int drained:1;
struct resample resample;
};
@@ -723,6 +724,7 @@ static int impl_node_process(void *object)
void **dst_datas;
bool flush_out = false;
bool flush_in = false;
+ bool draining = false;
spa_return_val_if_fail(this != NULL, -EINVAL);
@@ -741,15 +743,18 @@ static int impl_node_process(void *object)
if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA))
return SPA_STATUS_HAVE_DATA;
-
- if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
- return SPA_STATUS_NEED_DATA;
-
/* recycle */
if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) {
recycle_buffer(this, outio->buffer_id);
outio->buffer_id = SPA_ID_INVALID;
}
+ if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) {
+ if (inio->status != SPA_STATUS_DRAINED || this->drained)
+ return outio->status = inio->status;
+
+ inio->buffer_id = 0;
+ inport->buffers[0].outbuf->datas[0].chunk->size = 0;
+ }
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL;
@@ -784,7 +789,7 @@ static int impl_node_process(void *object)
size = sb->datas[0].maxsize;
memset(sb->datas[0].data, 0, size);
inport->offset = 0;
- flush_in = true;
+ flush_in = draining = true;
}
if (this->io_rate_match) {
@@ -829,18 +834,19 @@ static int impl_node_process(void *object)
if (inport->offset >= size || flush_in) {
inio->status = SPA_STATUS_NEED_DATA;
inport->offset = 0;
- SPA_FLAG_SET(res, SPA_STATUS_NEED_DATA);
- spa_log_trace_fp(this->log, NAME " %p: return input buffer", this);
+ SPA_FLAG_SET(res, inio->status);
+ spa_log_trace_fp(this->log, NAME " %p: return input buffer of size %d", this, size);
}
outport->offset += out_len * sizeof(float);
if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) {
outio->status = SPA_STATUS_HAVE_DATA;
outio->buffer_id = dbuf->id;
+ spa_log_trace_fp(this->log, NAME " %p: have output buffer of size %d", this, outport->offset);
dequeue_buffer(this, dbuf);
outport->offset = 0;
+ this->drained = draining;
SPA_FLAG_SET(res, SPA_STATUS_HAVE_DATA);
- spa_log_trace_fp(this->log, NAME " %p: have output buffer", this);
}
if (out_len == 0 && this->peaks) {
outio->status = SPA_STATUS_HAVE_DATA;
diff --git a/spa/plugins/audioconvert/splitter.c b/spa/plugins/audioconvert/splitter.c
index 48dc1c3c..6877df8f 100644
--- a/spa/plugins/audioconvert/splitter.c
+++ b/spa/plugins/audioconvert/splitter.c
@@ -860,7 +860,7 @@ static int impl_node_process(void *object)
inio, inio->status, inio->buffer_id);
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
- return SPA_STATUS_NEED_DATA;
+ return inio->status;
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL;
diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c
index 450eb87f..13473850 100644
--- a/src/pipewire/impl-node.c
+++ b/src/pipewire/impl-node.c
@@ -906,6 +906,9 @@ static inline int process_node(void *data)
} else {
resume_node(this, status);
}
+ if (status & SPA_STATUS_DRAINED) {
+ pw_context_driver_emit_drained(this->context, this);
+ }
return 0;
}
diff --git a/src/pipewire/private.h b/src/pipewire/private.h
index 795614ca..a59698b2 100644
--- a/src/pipewire/private.h
+++ b/src/pipewire/private.h
@@ -235,6 +235,7 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq,
#define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n)
#define pw_context_driver_emit_incomplete(c,n) pw_context_driver_emit(c, incomplete, 0, n)
#define pw_context_driver_emit_timeout(c,n) pw_context_driver_emit(c, timeout, 0, n)
+#define pw_context_driver_emit_drained(c,n) pw_context_driver_emit(c, drained, 0, n)
struct pw_context_driver_events {
#define PW_VERSION_CONTEXT_DRIVER_EVENTS 0
@@ -248,6 +249,8 @@ struct pw_context_driver_events {
void (*incomplete) (void *data, struct pw_impl_node *node);
/** The driver got a sync timeout */
void (*timeout) (void *data, struct pw_impl_node *node);
+ /** a node drained */
+ void (*drained) (void *data, struct pw_impl_node *node);
};
#define pw_registry_resource(r,m,v,...) pw_resource_call(r, struct pw_registry_events,m,v,##__VA_ARGS__)
diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c
index d1b41e19..ec848243 100644
--- a/src/pipewire/stream.c
+++ b/src/pipewire/stream.c
@@ -331,7 +331,6 @@ do_call_drained(struct spa_loop *loop,
struct pw_stream *stream = &impl->this;
pw_log_trace(NAME" %p: drained", stream);
pw_stream_emit_drained(stream);
- impl->draining = false;
return 0;
}
@@ -765,8 +764,7 @@ again:
pw_log_trace(NAME" %p: process out status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64, stream,
io->status, io->buffer_id, impl->time.ticks, impl->time.delay);
- res = 0;
- if (io->status != SPA_STATUS_HAVE_DATA) {
+ if ((res = io->status) != SPA_STATUS_HAVE_DATA) {
/* recycle old buffer */
if ((b = get_buffer(stream, io->buffer_id)) != NULL) {
pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
@@ -776,20 +774,17 @@ again:
/* pop new buffer */
if ((b = pop_queue(impl, &impl->queued)) != NULL) {
io->buffer_id = b->id;
- io->status = SPA_STATUS_HAVE_DATA;
+ res = io->status = SPA_STATUS_HAVE_DATA;
pw_log_trace(NAME" %p: pop %d %p", stream, b->id, io);
+ } else if (impl->draining) {
+ impl->drained = true;
+ io->buffer_id = SPA_ID_INVALID;
+ res = io->status = SPA_STATUS_DRAINED;
+ pw_log_trace(NAME" %p: draining", stream);
} else {
io->buffer_id = SPA_ID_INVALID;
- io->status = SPA_STATUS_NEED_DATA;
+ res = io->status = SPA_STATUS_NEED_DATA;
pw_log_trace(NAME" %p: no more buffers %p", stream, io);
- if (impl->draining && !impl->drained) {
- b = pop_queue(impl, &impl->dequeued);
- io->buffer_id = b->id;
- io->status = SPA_STATUS_HAVE_DATA;
- b->this.buffer->datas[0].chunk->size = 0;
- pw_log_trace(NAME" %p: drain buffer %d", stream, b->id);
- impl->drained = true;
- }
}
}
@@ -803,7 +798,6 @@ again:
}
copy_position(impl, impl->queued.outcount);
- res = io->status;
pw_log_trace(NAME" %p: res %d", stream, res);
return res;
@@ -1044,7 +1038,7 @@ static const struct pw_core_events core_events = {
.error = on_core_error,
};
-static void context_xrun(void *data, struct pw_impl_node *node)
+static void context_drained(void *data, struct pw_impl_node *node)
{
struct stream *impl = data;
if (impl->node != node)
@@ -1055,7 +1049,7 @@ static void context_xrun(void *data, struct pw_impl_node *node)
static const struct pw_context_driver_events context_events = {
PW_VERSION_CONTEXT_DRIVER_EVENTS,
- .xrun = context_xrun,
+ .drained = context_drained,
};
static struct stream *