summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Henningsson <david.henningsson@canonical.com>2014-04-25 16:58:03 +0200
committerDavid Henningsson <david.henningsson@canonical.com>2014-06-27 14:23:09 +0200
commit4931637f822f23888c7477ad5bbfcf87b337db48 (patch)
tree39c3167b7863a0f9064a26f04f1c634e2551d67d
parentb06e61652533201f0b58ed339d4435099d180a36 (diff)
pstream: Allow reading/writing through srbchannel
For writing, we prefer writing through the srbchannel if one is available, and we have no ancil data to send. For reading, we support reading from both in parallel. This meant replicating a struct used for reading, so a lot of this patch is just a search/replace in do_read to use the appropriate channel for reading. Signed-off-by: David Henningsson <david.henningsson@canonical.com>
-rw-r--r--src/pulsecore/pstream.c230
-rw-r--r--src/pulsecore/pstream.h5
2 files changed, 158 insertions, 77 deletions
diff --git a/src/pulsecore/pstream.c b/src/pulsecore/pstream.c
index 539c4a2fb..ceda72859 100644
--- a/src/pulsecore/pstream.c
+++ b/src/pulsecore/pstream.c
@@ -109,12 +109,23 @@ struct item_info {
uint32_t block_id;
};
+struct pstream_read {
+ pa_pstream_descriptor descriptor;
+ pa_memblock *memblock;
+ pa_packet *packet;
+ uint32_t shm_info[PA_PSTREAM_SHM_MAX];
+ void *data;
+ size_t index;
+};
+
struct pa_pstream {
PA_REFCNT_DECLARE;
pa_mainloop_api *mainloop;
pa_defer_event *defer_event;
pa_iochannel *io;
+ pa_srbchannel *srb, *srbpending;
+ bool is_srbpending;
pa_queue *send_queue;
@@ -132,14 +143,7 @@ struct pa_pstream {
pa_memchunk memchunk;
} write;
- struct {
- pa_pstream_descriptor descriptor;
- pa_memblock *memblock;
- pa_packet *packet;
- uint32_t shm_info[PA_PSTREAM_SHM_MAX];
- void *data;
- size_t index;
- } read;
+ struct pstream_read readio, readsrb;
bool use_shm;
pa_memimport *import;
@@ -172,7 +176,7 @@ struct pa_pstream {
};
static int do_write(pa_pstream *p);
-static int do_read(pa_pstream *p);
+static int do_read(pa_pstream *p, struct pstream_read *re);
static void do_pstream_read_write(pa_pstream *p) {
pa_assert(p);
@@ -182,8 +186,13 @@ static void do_pstream_read_write(pa_pstream *p) {
p->mainloop->defer_enable(p->defer_event, 0);
+ if (!p->dead && p->srb) {
+ do_write(p);
+ while (!p->dead && do_read(p, &p->readsrb) == 0);
+ }
+
if (!p->dead && pa_iochannel_is_readable(p->io)) {
- if (do_read(p) < 0)
+ if (do_read(p, &p->readio) < 0)
goto fail;
} else if (!p->dead && pa_iochannel_is_hungup(p->io))
goto fail;
@@ -208,6 +217,17 @@ fail:
pa_pstream_unref(p);
}
+static bool srb_callback(pa_srbchannel *srb, void *userdata) {
+ pa_pstream *p = userdata;
+
+ pa_assert(p);
+ pa_assert(PA_REFCNT_VALUE(p) > 0);
+ pa_assert(p->srb == srb);
+
+ do_pstream_read_write(p);
+ return p->srb != NULL;
+}
+
static void io_callback(pa_iochannel*io, void *userdata) {
pa_pstream *p = userdata;
@@ -289,11 +309,17 @@ static void pstream_free(pa_pstream *p) {
if (p->write.memchunk.memblock)
pa_memblock_unref(p->write.memchunk.memblock);
- if (p->read.memblock)
- pa_memblock_unref(p->read.memblock);
+ if (p->readsrb.memblock)
+ pa_memblock_unref(p->readsrb.memblock);
- if (p->read.packet)
- pa_packet_unref(p->read.packet);
+ if (p->readsrb.packet)
+ pa_packet_unref(p->readsrb.packet);
+
+ if (p->readio.memblock)
+ pa_memblock_unref(p->readio.memblock);
+
+ if (p->readio.packet)
+ pa_packet_unref(p->readio.packet);
pa_xfree(p);
}
@@ -556,6 +582,20 @@ static void prepare_next_write_item(pa_pstream *p) {
#endif
}
+static void check_srbpending(pa_pstream *p) {
+ if (!p->is_srbpending)
+ return;
+
+ if (p->srb)
+ pa_srbchannel_free(p->srb);
+
+ p->srb = p->srbpending;
+ p->is_srbpending = false;
+
+ if (p->srb)
+ pa_srbchannel_set_callback(p->srb, srb_callback, p);
+}
+
static int do_write(pa_pstream *p) {
void *d;
size_t l;
@@ -568,8 +608,11 @@ static int do_write(pa_pstream *p) {
if (!p->write.current)
prepare_next_write_item(p);
- if (!p->write.current)
+ if (!p->write.current) {
+ /* The out queue is empty, so switching channels is safe */
+ check_srbpending(p);
return 0;
+ }
if (p->write.minibuf_validsize > 0) {
d = p->write.minibuf + p->write.index;
@@ -606,8 +649,9 @@ static int do_write(pa_pstream *p) {
p->send_ancil_now = false;
} else
#endif
-
- if ((r = pa_iochannel_write(p->io, d, l)) < 0)
+ if (p->srb)
+ r = pa_srbchannel_write(p->srb, d, l);
+ else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
goto fail;
if (release_memblock)
@@ -639,7 +683,7 @@ fail:
return -1;
}
-static int do_read(pa_pstream *p) {
+static int do_read(pa_pstream *p, struct pstream_read *re) {
void *d;
size_t l;
ssize_t r;
@@ -647,23 +691,32 @@ static int do_read(pa_pstream *p) {
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
- if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
- d = (uint8_t*) p->read.descriptor + p->read.index;
- l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
+ if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
+ d = (uint8_t*) re->descriptor + re->index;
+ l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
} else {
- pa_assert(p->read.data || p->read.memblock);
+ pa_assert(re->data || re->memblock);
- if (p->read.data)
- d = p->read.data;
+ if (re->data)
+ d = re->data;
else {
- d = pa_memblock_acquire(p->read.memblock);
- release_memblock = p->read.memblock;
+ d = pa_memblock_acquire(re->memblock);
+ release_memblock = re->memblock;
}
- d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
- l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
+ d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
+ l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
}
+ if (re == &p->readsrb) {
+ r = pa_srbchannel_read(p->srb, d, l);
+ if (r == 0) {
+ if (release_memblock)
+ pa_memblock_release(release_memblock);
+ return 1;
+ }
+ }
+ else
#ifdef HAVE_CREDS
{
pa_ancil b;
@@ -689,13 +742,13 @@ static int do_read(pa_pstream *p) {
if (release_memblock)
pa_memblock_release(release_memblock);
- p->read.index += (size_t) r;
+ re->index += (size_t) r;
- if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
+ if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
uint32_t flags, length, channel;
/* Reading of frame descriptor complete */
- flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
@@ -706,10 +759,10 @@ static int do_read(pa_pstream *p) {
/* This is a SHM memblock release frame with no payload */
-/* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
pa_assert(p->export);
- pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+ pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
goto frame_done;
@@ -717,24 +770,24 @@ static int do_read(pa_pstream *p) {
/* This is a SHM memblock revoke frame with no payload */
-/* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
+/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
pa_assert(p->import);
- pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
+ pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
goto frame_done;
}
- length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
+ length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
return -1;
}
- pa_assert(!p->read.packet && !p->read.memblock);
+ pa_assert(!re->packet && !re->memblock);
- channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
+ channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
if (channel == (uint32_t) -1) {
@@ -744,8 +797,8 @@ static int do_read(pa_pstream *p) {
}
/* Frame is a packet frame */
- p->read.packet = pa_packet_new(length);
- p->read.data = p->read.packet->data;
+ re->packet = pa_packet_new(length);
+ re->data = re->packet->data;
} else {
@@ -756,20 +809,20 @@ static int do_read(pa_pstream *p) {
if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
- if (length != sizeof(p->read.shm_info)) {
+ if (length != sizeof(re->shm_info)) {
pa_log_warn("Received SHM memblock frame with invalid frame length.");
return -1;
}
/* Frame is a memblock frame referencing an SHM memblock */
- p->read.data = p->read.shm_info;
+ re->data = re->shm_info;
} else if ((flags & PA_FLAG_SHMMASK) == 0) {
/* Frame is a memblock frame */
- p->read.memblock = pa_memblock_new(p->mempool, length);
- p->read.data = NULL;
+ re->memblock = pa_memblock_new(p->mempool, length);
+ re->data = NULL;
} else {
pa_log_warn("Received memblock frame with invalid flags value.");
@@ -777,74 +830,74 @@ static int do_read(pa_pstream *p) {
}
}
- } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
+ } else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
/* Frame payload available */
- if (p->read.memblock && p->receive_memblock_callback) {
+ if (re->memblock && p->receive_memblock_callback) {
/* Is this memblock data? Than pass it to the user */
- l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
+ l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
if (l > 0) {
pa_memchunk chunk;
- chunk.memblock = p->read.memblock;
- chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
+ chunk.memblock = re->memblock;
+ chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
chunk.length = l;
if (p->receive_memblock_callback) {
int64_t offset;
offset = (int64_t) (
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
p->receive_memblock_callback(
p,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->receive_memblock_callback_userdata);
}
/* Drop seek info for following callbacks */
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
- p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
+ re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
}
}
/* Frame complete */
- if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
+ if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
- if (p->read.memblock) {
+ if (re->memblock) {
/* This was a memblock frame. We can unref the memblock now */
- pa_memblock_unref(p->read.memblock);
+ pa_memblock_unref(re->memblock);
- } else if (p->read.packet) {
+ } else if (re->packet) {
if (p->receive_packet_callback)
#ifdef HAVE_CREDS
- p->receive_packet_callback(p, p->read.packet, &p->read_ancil, p->receive_packet_callback_userdata);
+ p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata);
#else
- p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
+ p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
#endif
- pa_packet_unref(p->read.packet);
+ pa_packet_unref(re->packet);
} else {
pa_memblock *b;
- uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
+ uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
pa_assert(p->import);
if (!(b = pa_memimport_get(p->import,
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
- ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
+ ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
!!(flags & PA_FLAG_SHMWRITABLE)))) {
if (pa_log_ratelimit(PA_LOG_DEBUG))
@@ -857,17 +910,17 @@ static int do_read(pa_pstream *p) {
chunk.memblock = b;
chunk.index = 0;
- chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
+ chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
offset = (int64_t) (
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
- (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
+ (((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
p->receive_memblock_callback(
p,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
offset,
- ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
+ ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
&chunk,
p->receive_memblock_callback_userdata);
}
@@ -883,10 +936,10 @@ static int do_read(pa_pstream *p) {
return 0;
frame_done:
- p->read.memblock = NULL;
- p->read.packet = NULL;
- p->read.index = 0;
- p->read.data = NULL;
+ re->memblock = NULL;
+ re->packet = NULL;
+ re->index = 0;
+ re->data = NULL;
#ifdef HAVE_CREDS
p->read_ancil.creds_valid = false;
@@ -988,6 +1041,9 @@ void pa_pstream_unlink(pa_pstream *p) {
p->dead = true;
+ while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
+ pa_pstream_set_srbchannel(p, NULL);
+
if (p->import) {
pa_memimport_free(p->import);
p->import = NULL;
@@ -1040,3 +1096,23 @@ bool pa_pstream_get_shm(pa_pstream *p) {
return p->use_shm;
}
+
+void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
+ pa_assert(p);
+ pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
+
+ if (srb == p->srb)
+ return;
+
+ /* We can't handle quick switches between srbchannels. */
+ pa_assert(!p->is_srbpending);
+
+ p->srbpending = srb;
+ p->is_srbpending = true;
+
+ /* Switch immediately, if possible. */
+ if (p->dead)
+ check_srbpending(p);
+ else
+ do_write(p);
+}
diff --git a/src/pulsecore/pstream.h b/src/pulsecore/pstream.h
index 4961570e4..8e4056cde 100644
--- a/src/pulsecore/pstream.h
+++ b/src/pulsecore/pstream.h
@@ -31,6 +31,7 @@
#include <pulsecore/packet.h>
#include <pulsecore/memblock.h>
#include <pulsecore/iochannel.h>
+#include <pulsecore/srbchannel.h>
#include <pulsecore/memchunk.h>
#include <pulsecore/creds.h>
#include <pulsecore/macro.h>
@@ -66,4 +67,8 @@ bool pa_pstream_is_pending(pa_pstream *p);
void pa_pstream_enable_shm(pa_pstream *p, bool enable);
bool pa_pstream_get_shm(pa_pstream *p);
+/* Enables shared ringbuffer channel. Note that the srbchannel is now owned by the pstream.
+ Setting srb to NULL will free any existing srbchannel. */
+void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb);
+
#endif