diff options
author | Wim Taymans <wtaymans@redhat.com> | 2014-08-07 16:32:52 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2014-08-07 16:32:52 +0200 |
commit | c0dd22a0511ba91173e21e8df13c966285ae3d61 (patch) | |
tree | b8e2179b32bbf908a57cfa2afc0460cabd0692e8 | |
parent | c57f8422f3db44759454ab94a847f3a901412fa4 (diff) |
headset: beginnings of headset module
-rw-r--r-- | src/Makefile.am | 12 | ||||
-rw-r--r-- | src/modules/module-headset.c | 1093 |
2 files changed, 1103 insertions, 2 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 5924bd8ad..b615da631 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1082,7 +1082,8 @@ libavahi_wrap_la_LIBADD = $(AM_LIBADD) $(AVAHI_CFLAGS) libpulsecore-@PA_MAJORMIN if HAVE_DBUS # Serveral module (e.g. libalsa-util.la) modlibexec_LTLIBRARIES += \ - module-console-kit.la + module-console-kit.la \ + module-headset.la endif modlibexec_LTLIBRARIES += \ @@ -1471,7 +1472,8 @@ SYMDEF_FILES = \ module-switch-on-connect-symdef.h \ module-switch-on-port-available-symdef.h \ module-filter-apply-symdef.h \ - module-filter-heuristics-symdef.h + module-filter-heuristics-symdef.h \ + module-headset-symdef.h if HAVE_ESOUND SYMDEF_FILES += \ @@ -2117,6 +2119,12 @@ module_rygel_media_server_la_LDFLAGS = $(MODULE_LDFLAGS) module_rygel_media_server_la_LIBADD = $(MODULE_LIBADD) $(DBUS_LIBS) libprotocol-http.la module_rygel_media_server_la_CFLAGS = $(AM_CFLAGS) $(DBUS_CFLAGS) +# Headset +module_headset_la_SOURCES = modules/module-headset.c +module_headset_la_LDFLAGS = $(MODULE_LDFLAGS) +module_headset_la_LIBADD = $(MODULE_LIBADD) $(DBUS_LIBS) +module_headset_la_CFLAGS = $(AM_CFLAGS) $(DBUS_CFLAGS) + ################################### # Some minor stuff # ################################### diff --git a/src/modules/module-headset.c b/src/modules/module-headset.c new file mode 100644 index 000000000..097ba3e17 --- /dev/null +++ b/src/modules/module-headset.c @@ -0,0 +1,1093 @@ +/*** + This file is part of PulseAudio. + + Copyright 2014 Wim Taymans <wtaymans@redhat.com> + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <errno.h> +#include <stdlib.h> +#include <sys/types.h> + +#include <pulse/rtclock.h> +#include <pulse/timeval.h> +#include <pulse/xmalloc.h> + +#include <pulsecore/dbus-shared.h> +#include <pulsecore/core-error.h> +#include <pulsecore/core-util.h> +#include <pulsecore/core-rtclock.h> +#include <pulsecore/hashmap.h> +#include <pulsecore/idxset.h> +#include <pulsecore/log.h> +#include <pulsecore/modargs.h> +#include <pulsecore/module.h> +#include <pulsecore/poll.h> +#include <pulsecore/rtpoll.h> +#include <pulsecore/socket-util.h> +#include <pulsecore/thread.h> +#include <pulsecore/time-smoother.h> + +#include "module-headset-symdef.h" + +PA_MODULE_AUTHOR("Wim Taymans"); +PA_MODULE_DESCRIPTION("Create cards for each Headset"); +PA_MODULE_VERSION(PACKAGE_VERSION); +PA_MODULE_LOAD_ONCE(true); + +static const char* const valid_modargs[] = { + NULL +}; + +struct userdata { + pa_module *module; + pa_core *core; + pa_dbus_connection *connection; + bool filter_added; + + pa_hashmap *devices; +}; + +typedef struct pa_headset { + struct userdata *userdata; + + char *path; + pa_card *card; + pa_sink *sink; + pa_source *source; + + pa_thread *thread; + pa_thread_mq thread_mq; + pa_rtpoll *rtpoll; + pa_rtpoll_item *rtpoll_item; + + bool connected; + int stream_fd; + size_t read_link_mtu; + size_t write_link_mtu; + pa_sample_spec sample_spec; + + size_t read_block_size; + size_t write_block_size; + uint64_t read_index; + uint64_t write_index; + pa_usec_t started_at; + pa_smoother *read_smoother; + pa_memchunk write_memchunk; + int stream_write_type; +} pa_headset; + +#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC) +#define FIXED_LATENCY_PLAYBACK (25 * PA_USEC_PER_MSEC) +#define FIXED_LATENCY_RECORD (25 * PA_USEC_PER_MSEC) + +/* Run from I/O thread */ +static void setup_connection(pa_headset *h) { + struct pollfd *pollfd; + int one; + + pa_log_info("Device %s resuming", h->path); + + h->read_block_size = h->read_link_mtu; + h->write_block_size = h->write_link_mtu; + + if (h->sink) { + pa_sink_set_max_request_within_thread(h->sink, h->write_block_size); + pa_sink_set_fixed_latency_within_thread(h->sink, + FIXED_LATENCY_PLAYBACK + + pa_bytes_to_usec(h->write_block_size, &h->sample_spec)); + } + + if (h->source) + pa_source_set_fixed_latency_within_thread(h->source, + FIXED_LATENCY_RECORD + + pa_bytes_to_usec(h->read_block_size, &h->sample_spec)); + + pa_make_fd_nonblock(h->stream_fd); + pa_make_socket_low_delay(h->stream_fd); + + one = 1; + if (setsockopt(h->stream_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)) < 0) + pa_log_warn("Failed to enable SO_TIMESTAMP: %s", pa_cstrerror(errno)); + + pa_log_debug("Device connection set up"); + + h->rtpoll_item = pa_rtpoll_item_new(h->rtpoll, PA_RTPOLL_NEVER, 1); + pollfd = pa_rtpoll_item_get_pollfd(h->rtpoll_item, NULL); + pollfd->fd = h->stream_fd; + pollfd->events = pollfd->revents = 0; + + h->read_index = h->write_index = 0; + h->started_at = 0; + + if (h->source) + h->read_smoother = pa_smoother_new(PA_USEC_PER_SEC, 2*PA_USEC_PER_SEC, true, true, 10, pa_rtclock_now(), true); +} + +static void teardown_connection(pa_headset *h) { + if (h->rtpoll_item) { + pa_rtpoll_item_free(h->rtpoll_item); + h->rtpoll_item = NULL; + } + + if (h->stream_fd >= 0) { + pa_close(h->stream_fd); + h->stream_fd = -1; + } + + if (h->read_smoother) { + pa_smoother_free(h->read_smoother); + h->read_smoother = NULL; + } + + if (h->write_memchunk.memblock) { + pa_memblock_unref(h->write_memchunk.memblock); + pa_memchunk_reset(&h->write_memchunk); + } + pa_log_debug("Device connection torn down"); +} + + +static int headset_connect(pa_headset *h) { + if (h->connected) + return 0; + + pa_log_debug("Connecting to device %s", h->path); + + +#if 0 + h->stream_fd = u->transport->acquire(u->transport, &u->read_link_mtu, &u->write_link_mtu); + if (h->stream_fd < 0) + return -1; +#endif + + h->read_link_mtu = 48; + h->write_link_mtu = 48; + + h->connected = true; + pa_log_info("Device %s connected: fd %d", h->path, h->stream_fd); + + return 0; +} + +static void headset_disconnect(pa_headset *h) { + /* Ignore if already disconnected */ + if (!h->connected) + return; + + pa_log_debug("Disconnecting device %s", h->path); + +#if 0 + u->transport->release(u->transport); +#endif + + h->connected = false; + + teardown_connection(h); +} + +/* Run from IO thread */ +static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + pa_headset *h = PA_SOURCE(o)->userdata; + bool failed = false; + int r; + + pa_assert(h->source == PA_SOURCE(o)); + + switch (code) { + + case PA_SOURCE_MESSAGE_SET_STATE: + + switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { + + case PA_SOURCE_SUSPENDED: + /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ + if (!PA_SOURCE_IS_OPENED(h->source->thread_info.state)) + break; + + /* Stop the device if the sink is suspended as well */ + if (!h->sink || h->sink->state == PA_SINK_SUSPENDED) + headset_disconnect(h); + + if (h->read_smoother) + pa_smoother_pause(h->read_smoother, pa_rtclock_now()); + + break; + + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: + if (h->source->thread_info.state != PA_SOURCE_SUSPENDED) + break; + + /* Resume the device if the sink was suspended as well */ + if (!h->sink || !PA_SINK_IS_OPENED(h->sink->thread_info.state)) { + if (headset_connect(h) < 0) + failed = true; + else + setup_connection(h); + } + + /* We don't resume the smoother here. Instead we + * wait until the first packet arrives */ + + break; + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + break; + } + + break; + + case PA_SOURCE_MESSAGE_GET_LATENCY: { + pa_usec_t wi, ri; + + if (h->read_smoother) { + wi = pa_smoother_get(h->read_smoother, pa_rtclock_now()); + ri = pa_bytes_to_usec(h->read_index, &h->sample_spec); + + *((pa_usec_t*) data) = FIXED_LATENCY_RECORD + wi > ri ? FIXED_LATENCY_RECORD + wi - ri : 0; + } else + *((pa_usec_t*) data) = 0; + + return 0; + } + + } + + r = pa_source_process_msg(o, code, data, offset, chunk); + + return (r < 0 || !failed) ? r : -1; +} + +/* Run from main thread */ +static int add_source(pa_headset *h) { + pa_source_new_data data; + struct userdata *u = h->userdata; + + pa_source_new_data_init(&data); + data.module = u->module; + data.card = h->card; + data.driver = __FILE__; + data.name = pa_sprintf_malloc("headset_source.%s", h->path); + data.namereg_fail = false; + pa_source_new_data_set_sample_spec(&data, &h->sample_spec); + + h->source = pa_source_new(u->core, &data, PA_SOURCE_HARDWARE|PA_SOURCE_LATENCY); + pa_source_new_data_done(&data); + if (!h->source) { + pa_log_error("Failed to create source"); + return -1; + } + + h->source->userdata = h; + h->source->parent.process_msg = source_process_msg; + + return 0; +} + +/* Run from IO thread */ +static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + pa_headset *h = PA_SINK(o)->userdata; + bool failed = false; + int r; + + pa_assert(h->sink == PA_SINK(o)); + + switch (code) { + + case PA_SINK_MESSAGE_SET_STATE: + + switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { + + case PA_SINK_SUSPENDED: + /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ + if (!PA_SINK_IS_OPENED(h->sink->thread_info.state)) + break; + + /* Stop the device if the source is suspended as well */ + if (!h->source || h->source->state == PA_SOURCE_SUSPENDED) + /* We deliberately ignore whether disconnect + * actually worked. Since the stream_fd is + * closed it doesn't really matter */ + headset_disconnect(h); + + break; + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + if (h->sink->thread_info.state != PA_SINK_SUSPENDED) + break; + + /* Resume the device if the source was suspended as well */ + if (!h->source || !PA_SOURCE_IS_OPENED(h->source->thread_info.state)) { + if (headset_connect(h) < 0) + failed = true; + else + setup_connection(h); + } + + break; + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + break; + + case PA_SINK_MESSAGE_GET_LATENCY: { + pa_usec_t wi, ri; + + if (h->read_smoother) { + ri = pa_smoother_get(h->read_smoother, pa_rtclock_now()); + wi = pa_bytes_to_usec(h->write_index + h->write_block_size, &h->sample_spec); + } else { + ri = pa_rtclock_now() - h->started_at; + wi = pa_bytes_to_usec(h->write_index, &h->sample_spec); + } + + *((pa_usec_t*) data) = FIXED_LATENCY_PLAYBACK + wi > ri ? FIXED_LATENCY_PLAYBACK + wi - ri : 0; + + return 0; + } + } + + r = pa_sink_process_msg(o, code, data, offset, chunk); + + return (r < 0 || !failed) ? r : -1; +} + +/* Run from main thread */ +static int add_sink(pa_headset *h) { + pa_sink_new_data data; + struct userdata *u = h->userdata; + + pa_sink_new_data_init(&data); + data.module = u->module; + data.card = h->card; + data.driver = __FILE__; + data.name = pa_sprintf_malloc("headset_sink.%s", h->path); + data.namereg_fail = false; + pa_sink_new_data_set_sample_spec(&data, &h->sample_spec); + + h->sink = pa_sink_new(u->core, &data, PA_SINK_HARDWARE|PA_SINK_LATENCY); + pa_sink_new_data_done(&data); + if (!h->sink) { + pa_log_error("Failed to create sink"); + return -1; + } + + h->sink->userdata = h; + h->sink->parent.process_msg = sink_process_msg; + + return 0; +} + +static int sco_process_render(pa_headset *h) { + int ret = 0; + + pa_assert(h); + pa_assert(h->sink); + + /* First, render some data */ + if (!h->write_memchunk.memblock) + pa_sink_render_full(h->sink, h->write_block_size, &h->write_memchunk); + + pa_assert(h->write_memchunk.length == h->write_block_size); + + for (;;) { + ssize_t l; + const void *p; + + /* Now write that data to the socket. The socket is of type + * SEQPACKET, and we generated the data of the MTU size, so this + * should just work. */ + + p = (const uint8_t *) pa_memblock_acquire_chunk(&h->write_memchunk); + l = pa_write(h->stream_fd, p, h->write_memchunk.length, &h->stream_write_type); + pa_memblock_release(h->write_memchunk.memblock); + + pa_assert(l != 0); + + if (l < 0) { + + if (errno == EINTR) + /* Retry right away if we got interrupted */ + continue; + else if (errno == EAGAIN) + /* Hmm, apparently the socket was not writable, give up for now */ + break; + + pa_log_error("Failed to write data to SCO socket: %s", pa_cstrerror(errno)); + ret = -1; + break; + } + + pa_assert((size_t) l <= h->write_memchunk.length); + + if ((size_t) l != h->write_memchunk.length) { + pa_log_error("Wrote memory block to socket only partially! %llu written, wanted to write %llu.", + (unsigned long long) l, + (unsigned long long) h->write_memchunk.length); + ret = -1; + break; + } + + h->write_index += (uint64_t) h->write_memchunk.length; + pa_memblock_unref(h->write_memchunk.memblock); + pa_memchunk_reset(&h->write_memchunk); + + ret = 1; + break; + } + + return ret; +} + +/* Run from IO thread */ +static int sco_process_push(pa_headset *h) { + int ret = 0; + pa_memchunk memchunk; + struct userdata *u; + + pa_assert(h); + pa_assert(h->source); + pa_assert(h->read_smoother); + + u = h->userdata; + + memchunk.memblock = pa_memblock_new(u->core->mempool, h->read_block_size); + memchunk.index = memchunk.length = 0; + + for (;;) { + ssize_t l; + void *p; + struct msghdr m; + struct cmsghdr *cm; + uint8_t aux[1024]; + struct iovec iov; + bool found_tstamp = false; + pa_usec_t tstamp; + + memset(&m, 0, sizeof(m)); + memset(&aux, 0, sizeof(aux)); + memset(&iov, 0, sizeof(iov)); + + m.msg_iov = &iov; + m.msg_iovlen = 1; + m.msg_control = aux; + m.msg_controllen = sizeof(aux); + + p = pa_memblock_acquire(memchunk.memblock); + iov.iov_base = p; + iov.iov_len = pa_memblock_get_length(memchunk.memblock); + l = recvmsg(h->stream_fd, &m, 0); + pa_memblock_release(memchunk.memblock); + + if (l <= 0) { + + if (l < 0 && errno == EINTR) + /* Retry right away if we got interrupted */ + continue; + + else if (l < 0 && errno == EAGAIN) + /* Hmm, apparently the socket was not readable, give up for now. */ + break; + + pa_log_error("Failed to read data from SCO socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF"); + ret = -1; + break; + } + pa_assert((size_t) l <= pa_memblock_get_length(memchunk.memblock)); + + /* In some rare occasions, we might receive packets of a very strange + * size. This could potentially be possible if the SCO packet was + * received partially over-the-air, or more probably due to hardware + * issues in our Bluetooth adapter. In these cases, in order to avoid + * an assertion failure due to unaligned data, just discard the whole + * packet */ + if (!pa_frame_aligned(l, &h->sample_spec)) { + pa_log_warn("SCO packet received of unaligned size: %zu", l); + break; + } + + memchunk.length = (size_t) l; + h->read_index += (uint64_t) l; + + for (cm = CMSG_FIRSTHDR(&m); cm; cm = CMSG_NXTHDR(&m, cm)) + if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { + struct timeval *tv = (struct timeval*) CMSG_DATA(cm); + pa_rtclock_from_wallclock(tv); + tstamp = pa_timeval_load(tv); + found_tstamp = true; + break; + } + + if (!found_tstamp) { + pa_log_warn("Couldn't find SO_TIMESTAMP data in auxiliary recvmsg() data!"); + tstamp = pa_rtclock_now(); + } + + pa_smoother_put(h->read_smoother, tstamp, pa_bytes_to_usec(h->read_index, &h->sample_spec)); + pa_smoother_resume(h->read_smoother, tstamp, true); + + pa_source_post(h->source, &memchunk); + + ret = l; + break; + } + + pa_memblock_unref(memchunk.memblock); + + return ret; +} + +/* I/O thread function */ +static void thread_func(void *userdata) { + pa_headset *h = userdata; + struct userdata *u = h->userdata; + unsigned do_write = 0; + unsigned pending_read_bytes = 0; + bool writable = false; + + pa_assert(h); + + pa_log_debug("IO Thread starting up"); + + if (u->core->realtime_scheduling) + pa_make_realtime(u->core->realtime_priority); + + pa_thread_mq_install(&h->thread_mq); + + /* Setup the stream only if the device was already connected */ + if (h->connected) + setup_connection(h); + + for (;;) { + struct pollfd *pollfd; + int ret; + bool disable_timer = true; + + pollfd = h->rtpoll_item ? pa_rtpoll_item_get_pollfd(h->rtpoll_item, NULL) : NULL; + + if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) { + pa_log_info("FD error: %s%s%s%s", + pollfd->revents & POLLERR ? "POLLERR " :"", + pollfd->revents & POLLHUP ? "POLLHUP " :"", + pollfd->revents & POLLPRI ? "POLLPRI " :"", + pollfd->revents & POLLNVAL ? "POLLNVAL " :""); + + if (pollfd->revents & POLLHUP) { + pollfd = NULL; + teardown_connection(h); + do_write = 0; + pending_read_bytes = 0; + writable = false; + } else + goto fail; + } + + if (h->source && PA_SOURCE_IS_LINKED(h->source->thread_info.state)) { + + /* We should send two blocks to the device before we expect + * a response. */ + + if (h->write_index == 0 && h->read_index <= 0) + do_write = 2; + + if (pollfd && (pollfd->revents & POLLIN)) { + int n_read; + + n_read = sco_process_push(h); + + if (n_read < 0) + goto fail; + + if (n_read > 0) { + /* We just read something, so we are supposed to write something, too */ + pending_read_bytes += n_read; + do_write += pending_read_bytes / h->write_block_size; + pending_read_bytes = pending_read_bytes % h->write_block_size; + } + } + } + + if (h->sink && PA_SINK_IS_LINKED(h->sink->thread_info.state)) { + + if (PA_UNLIKELY(h->sink->thread_info.rewind_requested)) + pa_sink_process_rewind(h->sink, 0); + + if (pollfd) { + if (pollfd->revents & POLLOUT) + writable = true; + + if ((!h->source || !PA_SOURCE_IS_LINKED(h->source->thread_info.state)) && do_write <= 0 && writable) { + pa_usec_t time_passed; + pa_usec_t audio_sent; + + /* Hmm, there is no input stream we could synchronize + * to. So let's do things by time */ + + time_passed = pa_rtclock_now() - h->started_at; + audio_sent = pa_bytes_to_usec(h->write_index, &h->sample_spec); + + if (audio_sent <= time_passed) { + pa_usec_t audio_to_send = time_passed - audio_sent; + + /* Never try to catch up for more than 100ms */ + if (h->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { + pa_usec_t skip_usec; + uint64_t skip_bytes; + + skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; + skip_bytes = pa_usec_to_bytes(skip_usec, &h->sample_spec); + + if (skip_bytes > 0) { + pa_memchunk tmp; + + pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", + (unsigned long long) skip_usec, + (unsigned long long) skip_bytes); + + pa_sink_render_full(h->sink, skip_bytes, &tmp); + pa_memblock_unref(tmp.memblock); + h->write_index += skip_bytes; + } + } + + do_write = 1; + pending_read_bytes = 0; + } + } + + if (writable && do_write > 0) { + int n_written; + + if (h->write_index <= 0) + h->started_at = pa_rtclock_now(); + + if ((n_written = sco_process_render(h)) < 0) + goto fail; + + if (n_written == 0) + pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); + + do_write -= n_written; + writable = false; + } + + if ((!h->source || !PA_SOURCE_IS_LINKED(h->source->thread_info.state)) && do_write <= 0) { + pa_usec_t sleep_for; + pa_usec_t time_passed, next_write_at; + + if (writable) { + /* Hmm, there is no input stream we could synchronize + * to. So let's estimate when we need to wake up the latest */ + time_passed = pa_rtclock_now() - h->started_at; + next_write_at = pa_bytes_to_usec(h->write_index, &h->sample_spec); + sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; + /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ + } else + /* drop stream every 500 ms */ + sleep_for = PA_USEC_PER_MSEC * 500; + + pa_rtpoll_set_timer_relative(h->rtpoll, sleep_for); + disable_timer = false; + } + } + } + + if (disable_timer) + pa_rtpoll_set_timer_disabled(h->rtpoll); + + /* Hmm, nothing to do. Let's sleep */ + if (pollfd) + pollfd->events = (short) (((h->sink && PA_SINK_IS_LINKED(h->sink->thread_info.state) && !writable) ? POLLOUT : 0) | + (h->source && PA_SOURCE_IS_LINKED(h->source->thread_info.state) ? POLLIN : 0)); + + if ((ret = pa_rtpoll_run(h->rtpoll, true)) < 0) { + pa_log_debug("pa_rtpoll_run failed with: %d", ret); + goto fail; + } + if (ret == 0) { + pa_log_debug("IO thread shutdown requested, stopping cleanly"); + headset_disconnect(h); + goto finish; + } + } + +fail: + /* If this was no regular exit from the loop we have to continue processing messages until we receive PA_MESSAGE_SHUTDOWN */ + pa_log_debug("IO thread failed"); + pa_asyncmsgq_wait_for(h->thread_mq.inq, PA_MESSAGE_SHUTDOWN); + +finish: + pa_log_debug("IO thread shutting down"); +} + +/* Run from main thread */ +static int start_thread(pa_headset *h) { + struct userdata *u; + + pa_assert(h); + pa_assert(!h->thread); + pa_assert(!h->rtpoll); + pa_assert(!h->rtpoll_item); + + u = h->userdata; + + h->rtpoll = pa_rtpoll_new(); + pa_thread_mq_init(&h->thread_mq, u->core->mainloop, h->rtpoll); + + if (!(h->thread = pa_thread_new("headset", thread_func, h))) { + pa_log_error("Failed to create IO thread"); + return -1; + } + + if (h->sink) { + pa_sink_set_asyncmsgq(h->sink, h->thread_mq.inq); + pa_sink_set_rtpoll(h->sink, h->rtpoll); + pa_sink_put(h->sink); + + if (h->sink->set_volume) + h->sink->set_volume(h->sink); + } + + if (h->source) { + pa_source_set_asyncmsgq(h->source, h->thread_mq.inq); + pa_source_set_rtpoll(h->source, h->rtpoll); + pa_source_put(h->source); + + if (h->source->set_volume) + h->source->set_volume(h->source); + } + + return 0; +} + +/* Run from main thread */ +static void stop_thread(pa_headset *h) { + pa_assert(h); + + if (h->sink) + pa_sink_unlink(h->sink); + + if (h->source) + pa_source_unlink(h->source); + + if (h->thread) { + pa_asyncmsgq_send(h->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL); + pa_thread_free(h->thread); + h->thread = NULL; + } + + if (h->rtpoll_item) { + pa_rtpoll_item_free(h->rtpoll_item); + h->rtpoll_item = NULL; + } + + if (h->rtpoll) { + pa_thread_mq_done(&h->thread_mq); + pa_rtpoll_free(h->rtpoll); + h->rtpoll = NULL; + } + + headset_disconnect(h); + + if (h->sink) { + pa_sink_unref(h->sink); + h->sink = NULL; + } + + if (h->source) { + pa_source_unref(h->source); + h->source = NULL; + } + + if (h->read_smoother) { + pa_smoother_free(h->read_smoother); + h->read_smoother = NULL; + } +} + +static int card_set_profile(pa_card *c, pa_card_profile *new_profile) { + return 0; +} + +static void add_device(struct userdata *u, const char *path, DBusMessageIter *props) { + pa_headset *h; + pa_card_new_data card_new_data; + pa_card_profile *p; + + pa_log_debug("Device %s added", path); + + h = pa_xnew0(pa_headset, 1); + h->path = pa_xstrdup (path); + h->userdata = u; + + /* make a card */ + pa_card_new_data_init(&card_new_data); + pa_proplist_sets(card_new_data.proplist, PA_PROP_DEVICE_STRING, path); + card_new_data.driver = __FILE__; + pa_card_new_data_set_name(&card_new_data, path); + + /* add on profile */ + p = pa_card_profile_new("on", _("On"), 0); + pa_hashmap_put(card_new_data.profiles, p->name, p); + + /* create the card object */ + h->card = pa_card_new(u->core, &card_new_data); + if (!h->card) { + pa_log("Unable to create card.\n"); + return; + } + + pa_card_new_data_done(&card_new_data); + + h->card->userdata = h; + h->card->set_profile = card_set_profile; + + h->sample_spec.format = PA_SAMPLE_S16LE; + h->sample_spec.channels = 1; + h->sample_spec.rate = 8000; + + if (add_sink(h) < 0) { + pa_log("Unable to add sink\n"); + return; + } + + if (add_source(h) < 0) { + pa_log("Unable to add source\n"); + return; + } + + if (start_thread(h) < 0) { + pa_log("Unable to start thread\n"); + return; + } + pa_assert_se(pa_hashmap_put(u->devices, h->path, h) >= 0); +} + +static void remove_device(struct userdata *u, const char *path) { + pa_headset *h; + + pa_log_debug("Device %s removed", path); + + if (!(h = pa_hashmap_get(u->devices, path))) { + pa_log_error("Asked to remove unknown device %s", path); + return; + } + + stop_thread(h); + + if (h->card) + pa_card_free(h->card); + + pa_xfree(h->path); + pa_xfree(h); +} + +static DBusHandlerResult filter_cb(DBusConnection *bus, DBusMessage *message, void *userdata) { + struct userdata *u = userdata; + DBusError error; + const char *path; + + pa_assert(bus); + pa_assert(message); + pa_assert(u); + + dbus_error_init(&error); + + if (dbus_message_is_signal(message, "org.freedesktop.HeadsetManager", "DeviceAdded")) { + + if (!dbus_message_get_args(message, &error, DBUS_TYPE_OBJECT_PATH, &path, DBUS_TYPE_INVALID)) { + pa_log_error("Failed to parse DeviceAdded signal: %s: %s", error.name, error.message); + goto finish; + } + + add_device(u, path, NULL); + + } else if (dbus_message_is_signal(message, "org.freedesktop.HeadsetManager", "DeviceRemoved")) { + + if (!dbus_message_get_args(message, &error, DBUS_TYPE_OBJECT_PATH, &path, DBUS_TYPE_INVALID)) { + pa_log_error("Failed to parse DeviceRemoved signal: %s: %s", error.name, error.message); + goto finish; + } + + remove_device(u, path); + } + +finish: + dbus_error_free(&error); + + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; +} + +static int get_headset_list(struct userdata *u) { + DBusError error; + DBusMessage *m = NULL, *reply = NULL; + DBusMessageIter iter, devices_i; + int ret = -1; + + pa_assert(u); + + dbus_error_init(&error); + + if (!(m = dbus_message_new_method_call("org.freedesktop.Headset", "/", "org.freedesktop.HeadsetManager", "GetDevices"))) { + pa_log("Failed to allocate GetDevices() method call."); + goto fail; + } + + if (!(reply = dbus_connection_send_with_reply_and_block(pa_dbus_connection_get(u->connection), m, -1, &error))) { + pa_log("GetDevices() call failed: %s: %s", error.name, error.message); + goto fail; + } + + if (!dbus_message_iter_init(reply, &iter) || !pa_streq(dbus_message_get_signature(reply), "a(oa{sv})")) { + pa_log("Invalid GetDevices() reply signature: %s", dbus_message_get_signature(reply)); + goto fail; + } + + dbus_message_iter_recurse(&iter, &devices_i); + + while (dbus_message_iter_get_arg_type(&devices_i) == DBUS_TYPE_DICT_ENTRY) { + DBusMessageIter device_i, props_i; + const char *path; + + dbus_message_iter_recurse(&devices_i, &device_i); + + pa_assert(dbus_message_iter_get_arg_type(&device_i) == DBUS_TYPE_STRING); + dbus_message_iter_get_basic(&device_i, &path); + + pa_assert_se(dbus_message_iter_next(&device_i)); + pa_assert(dbus_message_iter_get_arg_type(&device_i) == DBUS_TYPE_ARRAY); + + dbus_message_iter_recurse(&device_i, &props_i); + + add_device (u, path, &props_i); + + dbus_message_iter_next(&devices_i); + } + ret = 0; + +fail: + + if (m) + dbus_message_unref(m); + + if (reply) + dbus_message_unref(reply); + + dbus_error_free(&error); + + return ret; +} + +int pa__init(pa_module*m) { + DBusError error; + pa_dbus_connection *connection; + struct userdata *u = NULL; + pa_modargs *ma; + + pa_assert(m); + + dbus_error_init(&error); + + if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { + pa_log("Failed to parse module arguments"); + goto fail; + } + + if (!(connection = pa_dbus_bus_get(m->core, DBUS_BUS_SYSTEM, &error)) || dbus_error_is_set(&error)) { + + if (connection) + pa_dbus_connection_unref(connection); + + pa_log_error("Unable to contact D-Bus system bus: %s: %s", error.name, error.message); + goto fail; + } + + m->userdata = u = pa_xnew0(struct userdata, 1); + u->core = m->core; + u->module = m; + u->connection = connection; + u->devices = pa_hashmap_new_full(pa_idxset_string_hash_func, pa_idxset_string_compare_func, NULL, NULL); + + if (!dbus_connection_add_filter(pa_dbus_connection_get(connection), filter_cb, u, NULL)) { + pa_log_error("Failed to add filter function"); + goto fail; + } + + u->filter_added = true; + + if (pa_dbus_add_matches( + pa_dbus_connection_get(connection), &error, + "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceAdded'", + "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceRemoved'", NULL) < 0) { + pa_log_error("Unable to subscribe to Headset signals: %s: %s", error.name, error.message); + goto fail; + } + + if (get_headset_list(u) < 0) + goto fail; + + pa_modargs_free(ma); + + return 0; + +fail: + if (ma) + pa_modargs_free(ma); + + dbus_error_free(&error); + pa__done(m); + + return -1; +} + +void pa__done(pa_module *m) { + struct userdata *u; + + pa_assert(m); + + if (!(u = m->userdata)) + return; + + if (u->connection) { + pa_dbus_remove_matches( + pa_dbus_connection_get(u->connection), + "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceAdded'", + "type='signal',sender='org.freedesktop.Headset',interface='org.freedesktop.HeadsetManager',member='DeviceRemoved'", NULL) ; + + if (u->filter_added) + dbus_connection_remove_filter(pa_dbus_connection_get(u->connection), filter_cb, u); + + pa_dbus_connection_unref(u->connection); + } + + pa_xfree(u); +} |