summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWim Taymans <wtaymans@redhat.com>2014-08-07 16:32:52 +0200
committerWim Taymans <wtaymans@redhat.com>2014-08-07 16:32:52 +0200
commitc0dd22a0511ba91173e21e8df13c966285ae3d61 (patch)
treeb8e2179b32bbf908a57cfa2afc0460cabd0692e8
parentc57f8422f3db44759454ab94a847f3a901412fa4 (diff)
headset: beginnings of headset module
-rw-r--r--src/Makefile.am12
-rw-r--r--src/modules/module-headset.c1093
2 files changed, 1103 insertions, 2 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 5924bd8ad..b615da631 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1082,7 +1082,8 @@ libavahi_wrap_la_LIBADD = $(AM_LIBADD) $(AVAHI_CFLAGS) libpulsecore-@PA_MAJORMIN
if HAVE_DBUS
# Serveral module (e.g. libalsa-util.la)
modlibexec_LTLIBRARIES += \
- module-console-kit.la
+ module-console-kit.la \
+ module-headset.la
endif
modlibexec_LTLIBRARIES += \
@@ -1471,7 +1472,8 @@ SYMDEF_FILES = \
module-switch-on-connect-symdef.h \
module-switch-on-port-available-symdef.h \
module-filter-apply-symdef.h \
- module-filter-heuristics-symdef.h
+ module-filter-heuristics-symdef.h \
+ module-headset-symdef.h
if HAVE_ESOUND
SYMDEF_FILES += \
@@ -2117,6 +2119,12 @@ module_rygel_media_server_la_LDFLAGS = $(MODULE_LDFLAGS)
module_rygel_media_server_la_LIBADD = $(MODULE_LIBADD) $(DBUS_LIBS) libprotocol-http.la
module_rygel_media_server_la_CFLAGS = $(AM_CFLAGS) $(DBUS_CFLAGS)
+# Headset
+module_headset_la_SOURCES = modules/module-headset.c
+module_headset_la_LDFLAGS = $(MODULE_LDFLAGS)
+module_headset_la_LIBADD = $(MODULE_LIBADD) $(DBUS_LIBS)
+module_headset_la_CFLAGS = $(AM_CFLAGS) $(DBUS_CFLAGS)
+
###################################
# Some minor stuff #
###################################
diff --git a/src/modules/module-headset.c b/src/modules/module-headset.c
new file mode 100644
index 000000000..097ba3e17
--- /dev/null
+++ b/src/modules/module-headset.c
@@ -0,0 +1,1093 @@
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2014 Wim Taymans <wtaymans@redhat.com>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/types.h>
+
+#include <pulse/rtclock.h>
+#include <pulse/timeval.h>
+#include <pulse/xmalloc.h>
+
+#include <pulsecore/dbus-shared.h>
+#include <pulsecore/core-error.h>
+#include <pulsecore/core-util.h>
+#include <pulsecore/core-rtclock.h>
+#include <pulsecore/hashmap.h>
+#include <pulsecore/idxset.h>
+#include <pulsecore/log.h>
+#include <pulsecore/modargs.h>
+#include <pulsecore/module.h>
+#include <pulsecore/poll.h>
+#include <pulsecore/rtpoll.h>
+#include <pulsecore/socket-util.h>
+#include <pulsecore/thread.h>
+#include <pulsecore/time-smoother.h>
+
+#include "module-headset-symdef.h"
+
+PA_MODULE_AUTHOR("Wim Taymans");
+PA_MODULE_DESCRIPTION("Create cards for each Headset");
+PA_MODULE_VERSION(PACKAGE_VERSION);
+PA_MODULE_LOAD_ONCE(true);
+
+static const char* const valid_modargs[] = {
+ NULL
+};
+
+struct userdata {
+ pa_module *module;
+ pa_core *core;
+ pa_dbus_connection *connection;
+ bool filter_added;
+
+ pa_hashmap *devices;
+};
+
+typedef struct pa_headset {
+ struct userdata *userdata;
+
+ char *path;
+ pa_card *card;
+ pa_sink *sink;
+ pa_source *source;
+
+ pa_thread *thread;
+ pa_thread_mq thread_mq;
+ pa_rtpoll *rtpoll;
+ pa_rtpoll_item *rtpoll_item;
+
+ bool connected;
+ int stream_fd;
+ size_t read_link_mtu;
+ size_t write_link_mtu;
+ pa_sample_spec sample_spec;
+
+ size_t read_block_size;
+ size_t write_block_size;
+ uint64_t read_index;
+ uint64_t write_index;
+ pa_usec_t started_at;
+ pa_smoother *read_smoother;
+ pa_memchunk write_memchunk;
+ int stream_write_type;
+} pa_headset;
+
+#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC)
+#define FIXED_LATENCY_PLAYBACK (25 * PA_USEC_PER_MSEC)
+#define FIXED_LATENCY_RECORD (25 * PA_USEC_PER_MSEC)
+
+/* Run from I/O thread */
+static void setup_connection(pa_headset *h) {
+ struct pollfd *pollfd;
+ int one;
+
+ pa_log_info("Device %s resuming", h->path);
+
+ h->read_block_size = h->read_link_mtu;
+ h->write_block_size = h->write_link_mtu;
+
+ if (h->sink) {
+ pa_sink_set_max_request_within_thread(h->sink, h->write_block_size);
+ pa_sink_set_fixed_latency_within_thread(h->sink,
+ FIXED_LATENCY_PLAYBACK +
+ pa_bytes_to_usec(h->write_block_size, &h->sample_spec));
+ }
+
+ if (h->source)
+ pa_source_set_fixed_latency_within_thread(h->source,
+ FIXED_LATENCY_RECORD +
+ pa_bytes_to_usec(h->read_block_size, &h->sample_spec));
+
+ pa_make_fd_nonblock(h->stream_fd);
+ pa_make_socket_low_delay(h->stream_fd);
+
+ one = 1;
+ if (setsockopt(h->stream_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0)
+ pa_log_warn("Failed to enable SO_TIMESTAMP: %s", pa_cstrerror(errno));
+
+ pa_log_debug("Device connection set up");
+
+ h->rtpoll_item = pa_rtpoll_item_new(h->rtpoll, PA_RTPOLL_NEVER, 1);
+ pollfd = pa_rtpoll_item_get_pollfd(h->rtpoll_item, NULL);
+ pollfd->fd = h->stream_fd;
+ pollfd->events = pollfd->revents = 0;
+
+ h->read_index = h->write_index = 0;
+ h->started_at = 0;
+
+ if (h->source)
+ h->read_smoother = pa_smoother_new(PA_USEC_PER_SEC, 2*PA_USEC_PER_SEC, true, true, 10, pa_rtclock_now(), true);
+}
+
+static void teardown_connection(pa_headset *h) {
+ if (h->rtpoll_item) {
+ pa_rtpoll_item_free(h->rtpoll_item);
+ h->rtpoll_item = NULL;
+ }
+
+ if (h->stream_fd >= 0) {
+ pa_close(h->stream_fd);
+ h->stream_fd = -1;
+ }
+
+ if (h->read_smoother) {
+ pa_smoother_free(h->read_smoother);
+ h->read_smoother = NULL;
+ }
+
+ if (h->write_memchunk.memblock) {
+ pa_memblock_unref(h->write_memchunk.memblock);
+ pa_memchunk_reset(&h->write_memchunk);
+ }
+ pa_log_debug("Device connection torn down");
+}
+
+
+static int headset_connect(pa_headset *h) {
+ if (h->connected)
+ return 0;
+
+ pa_log_debug("Connecting to device %s", h->path);
+
+
+#if 0
+ h->stream_fd = u->transport->acquire(u->transport, &u->read_link_mtu, &u->write_link_mtu);
+ if (h->stream_fd < 0)
+ return -1;
+#endif
+
+ h->read_link_mtu = 48;
+ h->write_link_mtu = 48;
+
+ h->connected = true;
+ pa_log_info("Device %s connected: fd %d", h->path, h->stream_fd);
+
+ return 0;
+}
+
+static void headset_disconnect(pa_headset *h) {
+ /* Ignore if already disconnected */
+ if (!h->connected)
+ return;
+
+ pa_log_debug("Disconnecting device %s", h->path);
+
+#if 0
+ u->transport->release(u->transport);
+#endif
+
+ h->connected = false;
+
+ teardown_connection(h);
+}
+
+/* Run from IO thread */
+static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ pa_headset *h = PA_SOURCE(o)->userdata;
+ bool failed = false;
+ int r;
+
+ pa_assert(h->source == PA_SOURCE(o));
+
+ switch (code) {
+
+ case PA_SOURCE_MESSAGE_SET_STATE:
+
+ switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) {
+
+ case PA_SOURCE_SUSPENDED:
+ /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */
+ if (!PA_SOURCE_IS_OPENED(h->source->thread_info.state))
+ break;
+
+ /* Stop the device if the sink is suspended as well */
+ if (!h->sink || h->sink->state == PA_SINK_SUSPENDED)
+ headset_disconnect(h);
+
+ if (h->read_smoother)
+ pa_smoother_pause(h->read_smoother, pa_rtclock_now());
+
+ break;
+
+ case PA_SOURCE_IDLE:
+ case PA_SOURCE_RUNNING:
+ if (h->source->thread_info.state != PA_SOURCE_SUSPENDED)
+ break;
+
+ /* Resume the device if the sink was suspended as well */
+ if (!h->sink || !PA_SINK_IS_OPENED(h->sink->thread_info.state)) {
+ if (headset_connect(h) < 0)
+ failed = true;
+ else
+ setup_connection(h);
+ }
+
+ /* We don't resume the smoother here. Instead we
+ * wait until the first packet arrives */
+
+ break;
+
+ case PA_SOURCE_UNLINKED:
+ case PA_SOURCE_INIT:
+ case PA_SOURCE_INVALID_STATE:
+ break;
+ }
+
+ break;
+
+ case PA_SOURCE_MESSAGE_GET_LATENCY: {
+ pa_usec_t wi, ri;
+
+ if (h->read_smoother) {
+ wi = pa_smoother_get(h->read_smoother, pa_rtclock_now());
+ ri = pa_bytes_to_usec(h->read_index, &h->sample_spec);
+
+ *((pa_usec_t*) data) = FIXED_LATENCY_RECORD + wi > ri ? FIXED_LATENCY_RECORD + wi - ri : 0;
+ } else
+ *((pa_usec_t*) data) = 0;
+
+ return 0;
+ }
+
+ }
+
+ r = pa_source_process_msg(o, code, data, offset, chunk);
+
+ return (r < 0 || !failed) ? r : -1;
+}
+
+/* Run from main thread */
+static int add_source(pa_headset *h) {
+ pa_source_new_data data;
+ struct userdata *u = h->userdata;
+
+ pa_source_new_data_init(&data);
+ data.module = u->module;
+ data.card = h->card;
+ data.driver = __FILE__;
+ data.name = pa_sprintf_malloc("headset_source.%s", h->path);
+ data.namereg_fail = false;
+ pa_source_new_data_set_sample_spec(&data, &h->sample_spec);
+
+ h->source = pa_source_new(u->core, &data, PA_SOURCE_HARDWARE|PA_SOURCE_LATENCY);
+ pa_source_new_data_done(&data);
+ if (!h->source) {
+ pa_log_error("Failed to create source");
+ return -1;
+ }
+
+ h->source->userdata = h;
+ h->source->parent.process_msg = source_process_msg;
+
+ return 0;
+}
+
+/* Run from IO thread */
+static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ pa_headset *h = PA_SINK(o)->userdata;
+ bool failed = false;
+ int r;
+
+ pa_assert(h->sink == PA_SINK(o));
+
+ switch (code) {
+
+ case PA_SINK_MESSAGE_SET_STATE:
+
+ switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) {
+
+ case PA_SINK_SUSPENDED:
+ /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */
+ if (!PA_SINK_IS_OPENED(h->sink->thread_info.state))
+ break;
+
+ /* Stop the device if the source is suspended as well */
+ if (!h->source || h->source->state == PA_SOURCE_SUSPENDED)
+ /* We deliberately ignore whether disconnect
+ * actually worked. Since the stream_fd is
+ * closed it doesn't really matter */
+ headset_disconnect(h);
+
+ break;
+
+ case PA_SINK_IDLE:
+ case PA_SINK_RUNNING:
+ if (h->sink->thread_info.state != PA_SINK_SUSPENDED)
+ break;
+
+ /* Resume the device if the source was suspended as well */
+ if (!h->source || !PA_SOURCE_IS_OPENED(h->source->thread_info.state)) {
+ if (headset_connect(h) < 0)
+ failed = true;
+ else
+ setup_connection(h);
+ }
+
+ break;
+
+ case PA_SINK_UNLINKED:
+ case PA_SINK_INIT:
+ case PA_SINK_INVALID_STATE:
+ break;
+ }
+
+ break;
+
+ case PA_SINK_MESSAGE_GET_LATENCY: {
+ pa_usec_t wi, ri;
+
+ if (h->read_smoother) {
+ ri = pa_smoother_get(h->read_smoother, pa_rtclock_now());
+ wi = pa_bytes_to_usec(h->write_index + h->write_block_size, &h->sample_spec);
+ } else {
+ ri = pa_rtclock_now() - h->started_at;
+ wi = pa_bytes_to_usec(h->write_index, &h->sample_spec);
+ }
+
+ *((pa_usec_t*) data) = FIXED_LATENCY_PLAYBACK + wi > ri ? FIXED_LATENCY_PLAYBACK + wi - ri : 0;
+
+ return 0;
+ }
+ }
+
+ r = pa_sink_process_msg(o, code, data, offset, chunk);
+
+ return (r < 0 || !failed) ? r : -1;
+}
+
+/* Run from main thread */
+static int add_sink(pa_headset *h) {
+ pa_sink_new_data data;
+ struct userdata *u = h->userdata;
+
+ pa_sink_new_data_init(&data);
+ data.module = u->module;
+ data.card = h->card;
+ data.driver = __FILE__;
+ data.name = pa_sprintf_malloc("headset_sink.%s", h->path);
+ data.namereg_fail = false;
+ pa_sink_new_data_set_sample_spec(&data, &h->sample_spec);
+
+ h->sink = pa_sink_new(u->core, &data, PA_SINK_HARDWARE|PA_SINK_LATENCY);
+ pa_sink_new_data_done(&data);
+ if (!h->sink) {
+ pa_log_error("Failed to create sink");
+ return -1;
+ }
+
+ h->sink->userdata = h;
+ h->sink->parent.process_msg = sink_process_msg;
+
+ return 0;
+}
+
+static int sco_process_render(pa_headset *h) {
+ int ret = 0;
+
+ pa_assert(h);
+ pa_assert(h->sink);
+
+ /* First, render some data */
+ if (!h->write_memchunk.memblock)
+ pa_sink_render_full(h->sink, h->write_block_size, &h->write_memchunk);
+
+ pa_assert(h->write_memchunk.length == h->write_block_size);
+
+ for (;;) {
+ ssize_t l;
+ const void *p;
+
+ /* Now write that data to the socket. The socket is of type
+ * SEQPACKET, and we generated the data of the MTU size, so this
+ * should just work. */
+
+ p = (const uint8_t *) pa_memblock_acquire_chunk(&h->write_memchunk);
+ l = pa_write(h->stream_fd, p, h->write_memchunk.length, &h->stream_write_type);
+ pa_memblock_release(h->write_memchunk.memblock);
+
+ pa_assert(l != 0);
+
+ if (l < 0) {
+
+ if (errno == EINTR)
+ /* Retry right away if we got interrupted */
+ continue;
+ else if (errno == EAGAIN)
+ /* Hmm, apparently the socket was not writable, give up for now */
+ break;
+
+ pa_log_error("Failed to write data to SCO socket: %s", pa_cstrerror(errno));
+ ret = -1;
+ break;
+ }
+
+ pa_assert((size_t) l <= h->write_memchunk.length);
+
+ if ((size_t) l != h->write_memchunk.length) {
+ pa_log_error("Wrote memory block to socket only partially! %llu written, wanted to write %llu.",
+ (unsigned long long) l,
+ (unsigned long long) h->write_memchunk.length);
+ ret = -1;
+ break;
+ }
+
+ h->write_index += (uint64_t) h->write_memchunk.length;
+ pa_memblock_unref(h->write_memchunk.memblock);
+ pa_memchunk_reset(&h->write_memchunk);
+
+ ret = 1;
+ break;
+ }
+
+ return ret;
+}
+
+/* Run from IO thread */
+static int sco_process_push(pa_headset *h) {
+ int ret = 0;
+ pa_memchunk memchunk;
+ struct userdata *u;
+
+ pa_assert(h);
+ pa_assert(h->source);
+ pa_assert(h->read_smoother);
+
+ u = h->userdata;
+
+ memchunk.memblock = pa_memblock_new(u->core->mempool, h->read_block_size);
+ memchunk.index = memchunk.length = 0;
+
+ for (;;) {
+ ssize_t l;
+ void *p;
+ struct msghdr m;
+ struct cmsghdr *cm;
+ uint8_t aux[1024];
+ struct iovec iov;
+ bool found_tstamp = false;
+ pa_usec_t tstamp;
+
+ memset(&m, 0, sizeof(m));
+ memset(&aux, 0, sizeof(aux));
+ memset(&iov, 0, sizeof(iov));
+
+ m.msg_iov = &iov;
+ m.msg_iovlen = 1;
+ m.msg_control = aux;
+ m.msg_controllen = sizeof(aux);
+
+ p = pa_memblock_acquire(memchunk.memblock);
+ iov.iov_base = p;
+ iov.iov_len = pa_memblock_get_length(memchunk.memblock);
+ l = recvmsg(h->stream_fd, &m, 0);
+ pa_memblock_release(memchunk.memblock);
+
+ if (l <= 0) {
+
+ if (l < 0 && errno == EINTR)
+ /* Retry right away if we got interrupted */
+ continue;
+
+ else if (l < 0 && errno == EAGAIN)
+ /* Hmm, apparently the socket was not readable, give up for now. */
+ break;
+
+ pa_log_error("Failed to read data from SCO socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF");
+ ret = -1;
+ break;
+ }
+ pa_assert((size_t) l <= pa_memblock_get_length(memchunk.memblock));
+
+ /* In some rare occasions, we might receive packets of a very strange
+ * size. This could potentially be possible if the SCO packet was
+ * received partially over-the-air, or more probably due to hardware
+ * issues in our Bluetooth adapter. In these cases, in order to avoid
+ * an assertion failure due to unaligned data, just discard the whole
+ * packet */
+ if (!pa_frame_aligned(l, &h->sample_spec)) {
+ pa_log_warn("SCO packet received of unaligned size: %zu", l);
+ break;
+ }
+
+ memchunk.length = (size_t) l;
+ h->read_index += (uint64_t) l;
+
+ for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm))
+ if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) {
+ struct timeval *tv = (struct timeval*) CMSG_DATA(cm);
+ pa_rtclock_from_wallclock(tv);
+ tstamp = pa_timeval_load(tv);
+ found_tstamp = true;
+ break;
+ }
+
+ if (!found_tstamp) {
+ pa_log_warn("Couldn't find SO_TIMESTAMP data in auxiliary recvmsg() data!");
+ tstamp = pa_rtclock_now();
+ }
+
+ pa_smoother_put(h->read_smoother, tstamp, pa_bytes_to_usec(h->read_index, &h->sample_spec));
+ pa_smoother_resume(h->read_smoother, tstamp, true);
+
+ pa_source_post(h->source, &memchunk);
+
+ ret = l;
+ break;
+ }
+
+ pa_memblock_unref(memchunk.memblock);
+
+ return ret;
+}
+
+/* I/O thread function */
+static void thread_func(void *userdata) {
+ pa_headset *h = userdata;
+ struct userdata *u = h->userdata;
+ unsigned do_write = 0;
+ unsigned pending_read_bytes = 0;
+ bool writable = false;
+
+ pa_assert(h);
+
+ pa_log_debug("IO Thread starting up");
+
+ if (u->core->realtime_scheduling)
+ pa_make_realtime(u->core->realtime_priority);
+
+ pa_thread_mq_install(&h->thread_mq);
+
+ /* Setup the stream only if the device was already connected */
+ if (h->connected)
+ setup_connection(h);
+
+ for (;;) {
+ struct pollfd *pollfd;
+ int ret;
+ bool disable_timer = true;
+
+ pollfd = h->rtpoll_item ? pa_rtpoll_item_get_pollfd(h->rtpoll_item, NULL) : NULL;
+
+ if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) {
+ pa_log_info("FD error: %s%s%s%s",
+ pollfd->revents & POLLERR ? "POLLERR " :"",
+ pollfd->revents & POLLHUP ? "POLLHUP " :"",
+ pollfd->revents & POLLPRI ? "POLLPRI " :"",
+ pollfd->revents & POLLNVAL ? "POLLNVAL " :"");
+
+ if (pollfd->revents & POLLHUP) {
+ pollfd = NULL;
+ teardown_connection(h);
+ do_write = 0;
+ pending_read_bytes = 0;
+ writable = false;
+ } else
+ goto fail;
+ }
+
+ if (h->source && PA_SOURCE_IS_LINKED(h->source->thread_info.state)) {
+
+ /* We should send two blocks to the device before we expect
+ * a response. */
+
+ if (h->write_index == 0 && h->read_index <= 0)
+ do_write = 2;
+
+ if (pollfd && (pollfd->revents & POLLIN)) {
+ int n_read;
+
+ n_read = sco_process_push(h);
+
+ if (n_read < 0)
+ goto fail;
+
+ if (n_read > 0) {
+ /* We just read something, so we are supposed to write something, too */
+ pending_read_bytes += n_read;
+ do_write += pending_read_bytes / h->write_block_size;
+ pending_read_bytes = pending_read_bytes % h->write_block_size;
+ }
+ }
+ }
+
+ if (h->sink && PA_SINK_IS_LINKED(h->sink->thread_info.state)) {
+
+ if (PA_UNLIKELY(h->sink->thread_info.rewind_requested))
+ pa_sink_process_rewind(h->sink, 0);
+
+ if (pollfd) {
+ if (pollfd->revents & POLLOUT)
+ writable = true;
+
+ if ((!h->source || !PA_SOURCE_IS_LINKED(h->source->thread_info.state)) && do_write <= 0 && writable) {
+ pa_usec_t time_passed;
+ pa_usec_t audio_sent;
+
+ /* Hmm, there is no input stream we could synchronize
+ * to. So let's do things by time */
+
+ time_passed = pa_rtclock_now() - h->started_at;
+ audio_sent = pa_bytes_to_usec(h->write_index, &h->sample_spec);
+
+ if (audio_sent <= time_passed) {
+ pa_usec_t audio_to_send = time_passed - audio_sent;
+
+ /* Never try to catch up for more than 100ms */
+ if (h->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) {
+ pa_usec_t skip_usec;
+ uint64_t skip_bytes;
+
+ skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC;
+ skip_bytes = pa_usec_to_bytes(skip_usec, &h->sample_spec);
+
+ if (skip_bytes > 0) {
+ pa_memchunk tmp;
+
+ pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream",
+ (unsigned long long) skip_usec,
+ (unsigned long long) skip_bytes);
+
+ pa_sink_render_full(h->sink, skip_bytes, &tmp);
+ pa_memblock_unref(tmp.memblock);
+ h->write_index += skip_bytes;
+ }
+ }
+
+ do_write = 1;
+ pending_read_bytes = 0;
+ }
+ }
+
+ if (writable && do_write > 0) {
+ int n_written;
+
+ if (h->write_index <= 0)
+ h->started_at = pa_rtclock_now();
+
+ if ((n_written = sco_process_render(h)) < 0)
+ goto fail;
+
+ if (n_written == 0)
+ pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!");
+
+ do_write -= n_written;
+ writable = false;
+ }
+
+ if ((!h->source || !PA_SOURCE_IS_LINKED(h->source->thread_info.state)) && do_write <= 0) {
+ pa_usec_t sleep_for;
+ pa_usec_t time_passed, next_write_at;
+
+ if (writable) {
+ /* Hmm, there is no input stream we could synchronize
+ * to. So let's estimate when we need to wake up the latest */
+ time_passed = pa_rtclock_now() - h->started_at;
+ next_write_at = pa_bytes_to_usec(h->write_index, &h->sample_spec);
+ sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0;
+ /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */
+ } else
+ /* drop stream every 500 ms */
+ sleep_for = PA_USEC_PER_MSEC * 500;
+
+ pa_rtpoll_set_timer_relative(h->rtpoll, sleep_for);
+ disable_timer = false;
+ }
+ }
+ }
+
+ if (disable_timer)
+ pa_rtpoll_set_timer_disabled(h->rtpoll);
+
+ /* Hmm, nothing to do. Let's sleep */
+ if (pollfd)
+ pollfd->events = (short) (((h->sink && PA_SINK_IS_LINKED(h->sink->thread_info.state) && !writable) ? POLLOUT : 0) |
+ (h->source && PA_SOURCE_IS_LINKED(h->source->thread_info.state) ? POLLIN : 0));
+
+ if ((ret = pa_rtpoll_run(h->rtpoll, true)) < 0) {
+ pa_log_debug("pa_rtpoll_run failed with: %d", ret);
+ goto fail;
+ }
+ if (ret == 0) {
+ pa_log_debug("IO thread shutdown requested, stopping cleanly");
+ headset_disconnect(h);
+ goto finish;
+ }
+ }
+
+fail:
+ /* If this was no regular exit from the loop we have to continue processing messages until we receive PA_MESSAGE_SHUTDOWN */
+ pa_log_debug("IO thread failed");
+ pa_asyncmsgq_wait_for(h->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
+
+finish:
+ pa_log_debug("IO thread shutting down");
+}
+
+/* Run from main thread */
+static int start_thread(pa_headset *h) {
+ struct userdata *u;
+
+ pa_assert(h);
+ pa_assert(!h->thread);
+ pa_assert(!h->rtpoll);
+ pa_assert(!h->rtpoll_item);
+
+ u = h->userdata;
+
+ h->rtpoll = pa_rtpoll_new();
+ pa_thread_mq_init(&h->thread_mq, u->core->mainloop, h->rtpoll);
+
+ if (!(h->thread = pa_thread_new("headset", thread_func, h))) {
+ pa_log_error("Failed to create IO thread");
+ return -1;
+ }
+
+ if (h->sink) {
+ pa_sink_set_asyncmsgq(h->sink, h->thread_mq.inq);
+ pa_sink_set_rtpoll(h->sink, h->rtpoll);
+ pa_sink_put(h->sink);
+
+ if (h->sink->set_volume)
+ h->sink->set_volume(h->sink);
+ }
+
+ if (h->source) {
+ pa_source_set_asyncmsgq(h->source, h->thread_mq.inq);
+ pa_source_set_rtpoll(h->source, h->rtpoll);
+ pa_source_put(h->source);
+
+ if (h->source->set_volume)
+ h->source->set_volume(h->source);
+ }
+
+ return 0;
+}
+
+/* Run from main thread */
+static void stop_thread(pa_headset *h) {
+ pa_assert(h);
+
+ if (h->sink)
+ pa_sink_unlink(h->sink);
+
+ if (h->source)
+ pa_source_unlink(h->source);
+
+ if (h->thread) {
+ pa_asyncmsgq_send(h->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
+ pa_thread_free(h->thread);
+ h->thread = NULL;
+ }
+
+ if (h->rtpoll_item) {
+ pa_rtpoll_item_free(h->rtpoll_item);
+ h->rtpoll_item = NULL;
+ }
+
+ if (h->rtpoll) {
+ pa_thread_mq_done(&h->thread_mq);
+ pa_rtpoll_free(h->rtpoll);
+ h->rtpoll = NULL;
+ }
+
+ headset_disconnect(h);
+
+ if (h->sink) {
+ pa_sink_unref(h->sink);
+ h->sink = NULL;
+ }
+
+ if (h->source) {
+ pa_source_unref(h->source);
+ h->source = NULL;
+ }
+
+ if (h->read_smoother) {
+ pa_smoother_free(h->read_smoother);
+ h->read_smoother = NULL;
+ }
+}
+
+static int card_set_profile(pa_card *c, pa_card_profile *new_profile) {
+ return 0;
+}
+
+static void add_device(struct userdata *u, const char *path, DBusMessageIter *props) {
+ pa_headset *h;
+ pa_card_new_data card_new_data;
+ pa_card_profile *p;
+
+ pa_log_debug("Device %s added", path);
+
+ h = pa_xnew0(pa_headset, 1);
+ h->path = pa_xstrdup (path);
+ h->userdata = u;
+
+ /* make a card */
+ pa_card_new_data_init(&card_new_data);
+ pa_proplist_sets(card_new_data.proplist, PA_PROP_DEVICE_STRING, path);
+ card_new_data.driver = __FILE__;
+ pa_card_new_data_set_name(&card_new_data, path);
+
+ /* add on profile */
+ p = pa_card_profile_new("on", _("On"), 0);
+ pa_hashmap_put(card_new_data.profiles, p->name, p);
+
+ /* create the card object */
+ h->card = pa_card_new(u->core, &card_new_data);
+ if (!h->card) {
+ pa_log("Unable to create card.\n");
+ return;
+ }
+
+ pa_card_new_data_done(&card_new_data);
+
+ h->card->userdata = h;
+ h->card->set_profile = card_set_profile;
+
+ h->sample_spec.format = PA_SAMPLE_S16LE;
+ h->sample_spec.channels = 1;
+ h->sample_spec.rate = 8000;
+
+ if (add_sink(h) < 0) {
+ pa_log("Unable to add sink\n");
+ return;
+ }
+
+ if (add_source(h) < 0) {
+ pa_log("Unable to add source\n");
+ return;
+ }
+
+ if (start_thread(h) < 0) {
+ pa_log("Unable to start thread\n");
+ return;
+ }
+ pa_assert_se(pa_hashmap_put(u->devices, h->path, h) >= 0);
+}
+
+static void remove_device(struct userdata *u, const char *path) {
+ pa_headset *h;
+
+ pa_log_debug("Device %s removed", path);
+
+ if (!(h = pa_hashmap_get(u->devices, path))) {
+ pa_log_error("Asked to remove unknown device %s", path);
+ return;
+ }
+
+ stop_thread(h);
+
+ if (h->card)
+ pa_card_free(h->card);
+
+ pa_xfree(h->path);
+ pa_xfree(h);
+}
+
+static DBusHandlerResult filter_cb(DBusConnection *bus, DBusMessage *message, void *userdata) {
+ struct userdata *u = userdata;
+ DBusError error;
+ const char *path;
+
+ pa_assert(bus);
+ pa_assert(message);
+ pa_assert(u);
+
+ dbus_error_init(&error);
+
+ if (dbus_message_is_signal(message, "org.freedesktop.HeadsetManager", "DeviceAdded")) {
+
+ if (!dbus_message_get_args(message, &error, DBUS_TYPE_OBJECT_PATH, &path, DBUS_TYPE_INVALID)) {
+ pa_log_error("Failed to parse DeviceAdded signal: %s: %s", error.name, error.message);
+ goto finish;
+ }
+
+ add_device(u, path, NULL);
+
+ } else if (dbus_message_is_signal(message, "org.freedesktop.HeadsetManager", "DeviceRemoved")) {
+
+ if (!dbus_message_get_args(message, &error, DBUS_TYPE_OBJECT_PATH, &path, DBUS_TYPE_INVALID)) {
+ pa_log_error("Failed to parse DeviceRemoved signal: %s: %s", error.name, error.message);
+ goto finish;
+ }
+
+ remove_device(u, path);
+ }
+
+finish:
+ dbus_error_free(&error);
+
+ return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
+}
+
+static int get_headset_list(struct userdata *u) {
+ DBusError error;
+ DBusMessage *m = NULL, *reply = NULL;
+ DBusMessageIter iter, devices_i;
+ int ret = -1;
+
+ pa_assert(u);
+
+ dbus_error_init(&error);
+
+ if (!(m = dbus_message_new_method_call("org.freedesktop.Headset", "/", "org.freedesktop.HeadsetManager", "GetDevices"))) {
+ pa_log("Failed to allocate GetDevices() method call.");
+ goto fail;
+ }
+
+ if (!(reply = dbus_connection_send_with_reply_and_block(pa_dbus_connection_get(u->connection), m, -1, &error))) {
+ pa_log("GetDevices() call failed: %s: %s", error.name, error.message);
+ goto fail;
+ }
+
+ if (!dbus_message_iter_init(reply, &iter) || !pa_streq(dbus_message_get_signature(reply), "a(oa{sv})")) {
+ pa_log("Invalid GetDevices() reply signature: %s", dbus_message_get_signature(reply));
+ goto fail;
+ }
+
+ dbus_message_iter_recurse(&iter, &devices_i);
+
+ while (dbus_message_iter_get_arg_type(&devices_i) == DBUS_TYPE_DICT_ENTRY) {
+ DBusMessageIter device_i, props_i;
+ const char *path;
+
+ dbus_message_iter_recurse(&devices_i, &device_i);
+
+ pa_assert(dbus_message_iter_get_arg_type(&device_i) == DBUS_TYPE_STRING);
+ dbus_message_iter_get_basic(&device_i, &path);
+
+ pa_assert_se(dbus_message_iter_next(&device_i));
+ pa_assert(dbus_message_iter_get_arg_type(&device_i) == DBUS_TYPE_ARRAY);
+
+ dbus_message_iter_recurse(&device_i, &props_i);
+
+ add_device (u, path, &props_i);
+
+ dbus_message_iter_next(&devices_i);
+ }
+ ret = 0;
+
+fail:
+
+ if (m)
+ dbus_message_unref(m);
+
+ if (reply)
+ dbus_message_unref(reply);
+
+ dbus_error_free(&error);
+
+ return ret;
+}
+
+int pa__init(pa_module*m) {
+ DBusError error;
+ pa_dbus_connection *connection;
+ struct userdata *u = NULL;
+ pa_modargs *ma;
+
+ pa_assert(m);
+
+ dbus_error_init(&error);
+
+ if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
+ pa_log("Failed to parse module arguments");
+ goto fail;
+ }
+
+ if (!(connection = pa_dbus_bus_get(m->core, DBUS_BUS_SYSTEM, &error)) || dbus_error_is_set(&error)) {
+
+ if (connection)
+ pa_dbus_connection_unref(connection);
+
+ pa_log_error("Unable to contact D-Bus system bus: %s: %s", error.name, error.message);
+ goto fail;
+ }
+
+ m->userdata = u = pa_xnew0(struct userdata, 1);
+ u->core = m->core;
+ u->module = m;
+ u->connection = connection;
+ u->devices = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, NULL);
+
+ if (!dbus_connection_add_filter(pa_dbus_connection_get(connection), filter_cb, u, NULL)) {
+ pa_log_error("Failed to add filter function");
+ goto fail;
+ }
+
+ u->filter_added = true;
+
+ if (pa_dbus_add_matches(
+ pa_dbus_connection_get(connection), &error,
+ "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceAdded'",
+ "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceRemoved'", NULL) < 0) {
+ pa_log_error("Unable to subscribe to Headset signals: %s: %s", error.name, error.message);
+ goto fail;
+ }
+
+ if (get_headset_list(u) < 0)
+ goto fail;
+
+ pa_modargs_free(ma);
+
+ return 0;
+
+fail:
+ if (ma)
+ pa_modargs_free(ma);
+
+ dbus_error_free(&error);
+ pa__done(m);
+
+ return -1;
+}
+
+void pa__done(pa_module *m) {
+ struct userdata *u;
+
+ pa_assert(m);
+
+ if (!(u = m->userdata))
+ return;
+
+ if (u->connection) {
+ pa_dbus_remove_matches(
+ pa_dbus_connection_get(u->connection),
+ "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceAdded'",
+ "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceRemoved'", NULL) ;
+
+ if (u->filter_added)
+ dbus_connection_remove_filter(pa_dbus_connection_get(u->connection), filter_cb, u);
+
+ pa_dbus_connection_unref(u->connection);
+ }
+
+ pa_xfree(u);
+}