diff options
author | ckdo <chrisrodri@free.fr> | 2019-08-15 08:14:42 +0200 |
---|---|---|
committer | Georg Chini <georg@chini.tk> | 2019-12-22 09:43:14 +0000 |
commit | 48545240582cade1cd4e62e2c6c0ad72f33c7014 (patch) | |
tree | 3347233b3799d7b8e976e352152d0b399896ae57 | |
parent | 46dd3be8ce74e8b74ef493fbb995dabbc4341e6d (diff) |
raop: Add autoreconnect feature
This patch adds the autoreconnect feature to the raop module.
This is mainly to be used in a server context, but can be used
also in a desktop usage context.
With autoreconnect feature, the raop module behaves like this:
- At initialisation or in case of the RTSP TCP connection lost, it
tries to reconnect every 5 seconds
- In case of any fatal error, it tries to reconnect every 5 seconds
- In UDP mode, if no timing packets received anymore for a long time,
RTSP connection is closed, then it tries to reconnect..
- After reconnection, once RTSP session has been established again,
playing is resumed automatically.
- When the connection is not established yet (or loss), the sink
behaves like a null sink. In the source code I called it "autonull",
even if autonull is set to autoreconnect param value, it could be
split into two different params.
-rw-r--r-- | src/modules/raop/module-raop-sink.c | 1 | ||||
-rw-r--r-- | src/modules/raop/raop-client.c | 44 | ||||
-rw-r--r-- | src/modules/raop/raop-client.h | 4 | ||||
-rw-r--r-- | src/modules/raop/raop-sink.c | 265 | ||||
-rw-r--r-- | src/modules/rtp/rtsp_client.c | 40 | ||||
-rw-r--r-- | src/modules/rtp/rtsp_client.h | 2 |
6 files changed, 292 insertions, 64 deletions
diff --git a/src/modules/raop/module-raop-sink.c b/src/modules/raop/module-raop-sink.c index 76004c487..393341a2d 100644 --- a/src/modules/raop/module-raop-sink.c +++ b/src/modules/raop/module-raop-sink.c @@ -62,6 +62,7 @@ static const char* const valid_modargs[] = { "username", "password", "latency_msec", + "autoreconnect", NULL }; diff --git a/src/modules/raop/raop-client.c b/src/modules/raop/raop-client.c index 5d45302bc..9a026db24 100644 --- a/src/modules/raop/raop-client.c +++ b/src/modules/raop/raop-client.c @@ -95,6 +95,7 @@ struct pa_raop_client { pa_rtsp_client *rtsp; char *sci, *sid; char *password; + bool autoreconnect; pa_raop_protocol_t protocol; pa_raop_encryption_t encryption; @@ -1379,8 +1380,39 @@ static void rtsp_auth_cb(pa_rtsp_client *rtsp, pa_rtsp_state_t state, pa_rtsp_st } } + +void pa_raop_client_disconnect(pa_raop_client *c) { + c->is_recording = false; + + if (c->tcp_sfd >= 0) + pa_close(c->tcp_sfd); + c->tcp_sfd = -1; + + if (c->udp_sfd >= 0) + pa_close(c->udp_sfd); + c->udp_sfd = -1; + + /* Polling sockets will be closed by sink */ + c->udp_cfd = c->udp_tfd = -1; + c->tcp_sfd = -1; + + pa_log_error("RTSP control channel closed (disconnected)"); + + if (c->rtsp) + pa_rtsp_client_free(c->rtsp); + if (c->sid) + pa_xfree(c->sid); + c->rtsp = NULL; + c->sid = NULL; + + if (c->state_callback) + c->state_callback((int) PA_RAOP_DISCONNECTED, c->state_userdata); + +} + + pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol, - pa_raop_encryption_t encryption, pa_raop_codec_t codec) { + pa_raop_encryption_t encryption, pa_raop_codec_t codec, bool autoreconnect) { pa_raop_client *c; pa_parsed_address a; @@ -1408,6 +1440,7 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot c->rtsp = NULL; c->sci = c->sid = NULL; c->password = NULL; + c->autoreconnect = autoreconnect; c->protocol = protocol; c->encryption = encryption; @@ -1473,7 +1506,7 @@ int pa_raop_client_authenticate (pa_raop_client *c, const char *password) { c->password = NULL; if (password) c->password = pa_xstrdup(password); - c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT); + c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT, c->autoreconnect); pa_assert(c->rtsp); @@ -1502,7 +1535,7 @@ int pa_raop_client_announce(pa_raop_client *c) { return 1; } - c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT); + c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT, c->autoreconnect); pa_assert(c->rtsp); @@ -1545,7 +1578,6 @@ bool pa_raop_client_can_stream(pa_raop_client *c) { pa_assert(c); if (!c->rtsp || !c->sci) { - pa_log_debug("Can't stream, connection not established yet..."); return false; } @@ -1729,6 +1761,10 @@ bool pa_raop_client_register_pollfd(pa_raop_client *c, pa_rtpoll *poll, pa_rtpol return oob; } +bool pa_raop_client_is_timing_fd(pa_raop_client *c, const int fd) { + return fd == c->udp_tfd; +} + pa_volume_t pa_raop_client_adjust_volume(pa_raop_client *c, pa_volume_t volume) { double minv, maxv; diff --git a/src/modules/raop/raop-client.h b/src/modules/raop/raop-client.h index e72459e94..faec01e65 100644 --- a/src/modules/raop/raop-client.h +++ b/src/modules/raop/raop-client.h @@ -57,7 +57,7 @@ typedef enum pa_raop_state { } pa_raop_state_t; pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol, - pa_raop_encryption_t encryption, pa_raop_codec_t codec); + pa_raop_encryption_t encryption, pa_raop_codec_t codec, bool autoreconnect); void pa_raop_client_free(pa_raop_client *c); int pa_raop_client_authenticate(pa_raop_client *c, const char *password); @@ -71,9 +71,11 @@ int pa_raop_client_stream(pa_raop_client *c); int pa_raop_client_set_volume(pa_raop_client *c, pa_volume_t volume); int pa_raop_client_flush(pa_raop_client *c); int pa_raop_client_teardown(pa_raop_client *c); +void pa_raop_client_disconnect(pa_raop_client *c); void pa_raop_client_get_frames_per_block(pa_raop_client *c, size_t *size); bool pa_raop_client_register_pollfd(pa_raop_client *c, pa_rtpoll *poll, pa_rtpoll_item **poll_item); +bool pa_raop_client_is_timing_fd(pa_raop_client *c, const int fd); pa_volume_t pa_raop_client_adjust_volume(pa_raop_client *c, pa_volume_t volume); void pa_raop_client_handle_oob_packet(pa_raop_client *c, const int fd, const uint8_t packet[], ssize_t size); ssize_t pa_raop_client_send_audio_packet(pa_raop_client *c, pa_memchunk *block, size_t offset); diff --git a/src/modules/raop/raop-sink.c b/src/modules/raop/raop-sink.c index bb2723a5f..aa66af2ff 100644 --- a/src/modules/raop/raop-sink.c +++ b/src/modules/raop/raop-sink.c @@ -59,12 +59,16 @@ #include <pulsecore/thread-mq.h> #include <pulsecore/poll.h> #include <pulsecore/rtpoll.h> +#include <pulsecore/core-rtclock.h> #include <pulsecore/time-smoother.h> #include "raop-sink.h" #include "raop-client.h" #include "raop-util.h" +#define UDP_TIMING_PACKET_LOSS_MAX (30 * PA_USEC_PER_SEC) +#define UDP_TIMING_PACKET_DISCONNECT_CYCLE 3 + struct userdata { pa_core *core; pa_module *module; @@ -78,11 +82,16 @@ struct userdata { bool oob; pa_raop_client *raop; + char *server; pa_raop_protocol_t protocol; pa_raop_encryption_t encryption; pa_raop_codec_t codec; + bool autoreconnect; + /* if true, behaves like a null-sink when disconnected */ + bool autonull; size_t block_size; + pa_usec_t block_usec; pa_memchunk memchunk; pa_usec_t delay; @@ -91,10 +100,13 @@ struct userdata { uint64_t write_count; uint32_t latency; + /* Consider as first I/O thread iteration, can be switched to true in autoreconnect mode */ + bool first; }; enum { - PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX + PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX, + PA_SINK_MESSAGE_DISCONNECT_REQUEST }; static void userdata_free(struct userdata *u); @@ -136,10 +148,23 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pa_assert(u->raop); switch (code) { + /* Exception : for this message, we are in main thread, msg sent from the IO/thread + Done here, as alloc/free of rtsp_client is also done in this thread for other cases */ + case PA_SINK_MESSAGE_DISCONNECT_REQUEST: { + if (u->sink->state == PA_SINK_RUNNING) { + /* Disconnect raop client, and restart the whole chain since + * the authentication token might be outdated */ + pa_raop_client_disconnect(u->raop); + pa_raop_client_authenticate(u->raop, NULL); + } + + return 0; + } + case PA_SINK_MESSAGE_GET_LATENCY: { int64_t r = 0; - if (pa_raop_client_can_stream(u->raop)) + if (u->autonull || pa_raop_client_can_stream(u->raop)) r = sink_get_latency(u); *((int64_t*) data) = r; @@ -154,6 +179,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pa_module_unload_request(u->module, true); } + if (u->autoreconnect && u->sink->state == PA_SINK_RUNNING) { + pa_usec_t now; + now = pa_rtclock_now(); + pa_smoother_reset(u->smoother, now, false); + + if (!pa_raop_client_is_alive(u->raop)) { + /* Connecting will trigger a RECORD and start steaming */ + pa_raop_client_announce(u->raop); + } + } + return 0; } @@ -171,6 +207,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse now = pa_rtclock_now(); u->write_count = 0; u->start = now; + u->first = true; pa_rtpoll_set_timer_absolute(u->rtpoll, now); if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { @@ -205,10 +242,22 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse u->rtpoll_item = NULL; } - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { pa_rtpoll_set_timer_disabled(u->rtpoll); - else if (u->sink->thread_info.state != PA_SINK_IDLE) - pa_module_unload_request(u->module, true); + + return 0; + } + + if (u->autoreconnect) { + if (u->sink->thread_info.state != PA_SINK_IDLE) { + if (!u->autonull) + pa_rtpoll_set_timer_disabled(u->rtpoll); + pa_raop_client_authenticate(u->raop, NULL); + } + } else { + if (u->sink->thread_info.state != PA_SINK_IDLE) + pa_module_unload_request(u->module, true); + } return 0; } @@ -265,8 +314,17 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, now = pa_rtclock_now(); pa_smoother_reset(u->smoother, now, false); + /* If autonull is enabled, I/O thread is always eating chunks since + * it is emulating a null sink */ + if (u->autonull) { + u->start = now; + u->write_count = 0; + u->first = true; + pa_rtpoll_set_timer_absolute(u->rtpoll, now); + } + if (!pa_raop_client_is_alive(u->raop)) { - /* Connecting will trigger a RECORD and start steaming */ + /* Connecting will trigger a RECORD and start streaming */ pa_raop_client_announce(u->raop); } else if (!pa_raop_client_is_recording(u->raop)) { /* RECORD alredy sent, simply start streaming */ @@ -342,6 +400,8 @@ static void sink_set_mute_cb(pa_sink *s) { static void thread_func(void *userdata) { struct userdata *u = userdata; size_t offset = 0; + pa_usec_t last_timing; + uint32_t check_timing_count; pa_assert(u); @@ -357,6 +417,7 @@ static void thread_func(void *userdata) { uint64_t position; size_t index; int ret; + bool canstream, sendstream, on_timeout; /* Polling (audio data + control socket + timing socket). */ if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) @@ -369,6 +430,7 @@ static void thread_func(void *userdata) { pa_sink_process_rewind(u->sink, 0); } + on_timeout = pa_rtpoll_timer_elapsed(u->rtpoll); if (u->rtpoll_item) { pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds); /* If !oob: streaming driven by pollds (POLLOUT) */ @@ -384,12 +446,19 @@ static void thread_func(void *userdata) { } /* if oob: streaming managed by timing, pollfd for oob sockets */ - if (pollfd && u->oob && !pa_rtpoll_timer_elapsed(u->rtpoll)) { + if (pollfd && u->oob && !on_timeout) { uint8_t packet[32]; ssize_t read; for (i = 0; i < nbfds; i++) { if (pollfd->revents & POLLERR) { + if (u->autoreconnect && pa_raop_client_is_alive(u->raop)) { + pollfd->revents = 0; + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), + PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL); + continue; + } + /* one of UDP fds is in faulty state, may have been disconnected, this is fatal */ goto fail; } @@ -397,6 +466,10 @@ static void thread_func(void *userdata) { pollfd->revents = 0; read = pa_read(pollfd->fd, packet, sizeof(packet), NULL); pa_raop_client_handle_oob_packet(u->raop, pollfd->fd, packet, read); + if (pa_raop_client_is_timing_fd(u->raop, pollfd->fd)) { + last_timing = pa_rtclock_now(); + check_timing_count = 1; + } } pollfd++; @@ -406,65 +479,133 @@ static void thread_func(void *userdata) { } } - if (u->sink->thread_info.state != PA_SINK_RUNNING) - continue; - if (!pa_raop_client_can_stream(u->raop)) + if (u->sink->thread_info.state != PA_SINK_RUNNING) { continue; + } - /* This assertion is meant to silence a complaint from Coverity about - * pollfd being possibly NULL when we access it later. That's a false - * positive, because we check pa_raop_client_can_stream() above, and if - * that returns true, it means that the connection is up, and when the - * connection is up, pollfd will be non-NULL. */ - pa_assert(pollfd); + if (u->first) { + last_timing = 0; + check_timing_count = 1; + intvl = 0; + u->first = false; + } - if (u->memchunk.length <= 0) { - if (u->memchunk.memblock) - pa_memblock_unref(u->memchunk.memblock); - pa_memchunk_reset(&u->memchunk); + canstream = pa_raop_client_can_stream(u->raop); + now = pa_rtclock_now(); + + if (u->oob && u->autoreconnect && on_timeout) { + if (!canstream) { + last_timing = 0; + } else if (last_timing != 0) { + pa_usec_t since = now - last_timing; + /* Incoming Timing packets should be received every 3 seconds in UDP mode + according to raop specifications. + Here we disconnect if no packet received since UDP_TIMING_PACKET_LOSS_MAX seconds + We only detect timing packet requests interruptions (we do nothing if no packet received at all), since some clients do not implement RTCP Timing requests at all */ + + if (since > (UDP_TIMING_PACKET_LOSS_MAX/UDP_TIMING_PACKET_DISCONNECT_CYCLE)*check_timing_count) { + if (check_timing_count < UDP_TIMING_PACKET_DISCONNECT_CYCLE) { + uint32_t since_in_sec = since / PA_USEC_PER_SEC; + pa_log_warn( + "UDP Timing Packets Warn #%d/%d- Nothing received since %d seconds from %s", + check_timing_count, + UDP_TIMING_PACKET_DISCONNECT_CYCLE-1, since_in_sec, u->server); + check_timing_count++; + } else { + /* Limit reached, then request disconnect */ + check_timing_count = 1; + last_timing = 0; + if (pa_raop_client_is_alive(u->raop)) { + pa_log_warn("UDP Timing Packets Warn limit reached - Requesting reconnect"); + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), + PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL); + continue; + } + } + } + } + } - /* Grab unencoded audio data from PulseAudio */ - pa_sink_render_full(u->sink, u->block_size, &u->memchunk); - offset = u->memchunk.index; + if (!u->autonull) { + if (!canstream) { + pa_log_debug("Can't stream, connection not established yet..."); + continue; + } + /* This assertion is meant to silence a complaint from Coverity about + * pollfd being possibly NULL when we access it later. That's a false + * positive, because we check pa_raop_client_can_stream() above, and if + * that returns true, it means that the connection is up, and when the + * connection is up, pollfd will be non-NULL. */ + pa_assert(pollfd); } - pa_assert(u->memchunk.length > 0); - - index = u->memchunk.index; - if (pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) { - if (errno == EINTR) { - /* Just try again. */ - pa_log_debug("Failed to write data to FIFO (EINTR), retrying"); - goto fail; - } else if (errno != EAGAIN && !u->oob) { - /* Buffer is full, wait for POLLOUT. */ - pollfd->events = POLLOUT; - pollfd->revents = 0; - } else { - pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); - goto fail; + if (u->memchunk.length <= 0) { + if (intvl < now + u->block_usec) { + if (u->memchunk.memblock) + pa_memblock_unref(u->memchunk.memblock); + pa_memchunk_reset(&u->memchunk); + + /* Grab unencoded audio data from PulseAudio */ + pa_sink_render_full(u->sink, u->block_size, &u->memchunk); + offset = u->memchunk.index; } - } else { - u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index; - position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec); + } - now = pa_rtclock_now(); - estimated = pa_bytes_to_usec(position, &u->sink->sample_spec); - pa_smoother_put(u->smoother, now, estimated); - - if (u->oob && !pollfd->revents) { - /* Sleep until next packet transmission */ - intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec); - pa_rtpoll_set_timer_absolute(u->rtpoll, intvl); - } else if (!u->oob) { - if (u->memchunk.length > 0) { - pollfd->events = POLLOUT; - pollfd->revents = 0; + if (u->memchunk.length > 0) { + index = u->memchunk.index; + sendstream = !u->autonull || (u->autonull && canstream); + if (sendstream && pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) { + if (errno == EINTR) { + /* Just try again. */ + pa_log_debug("Failed to write data to FIFO (EINTR), retrying"); + if (u->autoreconnect) { + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST, + 0, 0, NULL, NULL); + continue; + } else + goto fail; + } else if (errno != EAGAIN && !u->oob) { + /* Buffer is full, wait for POLLOUT. */ + if (!u->oob) { + pollfd->events = POLLOUT; + pollfd->revents = 0; + } } else { + pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno)); + if (u->autoreconnect) { + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST, + 0, 0, NULL, NULL); + continue; + } else + goto fail; + } + } else { + if (sendstream) { + u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index; + } else { + u->write_count += u->memchunk.length; + u->memchunk.length = 0; + } + position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec); + + now = pa_rtclock_now(); + estimated = pa_bytes_to_usec(position, &u->sink->sample_spec); + pa_smoother_put(u->smoother, now, estimated); + + if ((u->autonull && !canstream) || (u->oob && canstream && on_timeout)) { + /* Sleep until next packet transmission */ intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec); pa_rtpoll_set_timer_absolute(u->rtpoll, intvl); - pollfd->revents = 0; - pollfd->events = 0; + } else if (!u->oob) { + if (u->memchunk.length > 0) { + pollfd->events = POLLOUT; + pollfd->revents = 0; + } else { + intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec); + pa_rtpoll_set_timer_absolute(u->rtpoll, intvl); + pollfd->revents = 0; + pollfd->events = 0; + } } } } @@ -592,6 +733,15 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) { u->rtpoll = pa_rtpoll_new(); u->rtpoll_item = NULL; u->latency = RAOP_DEFAULT_LATENCY; + u->autoreconnect = false; + u->server = pa_xstrdup(server); + + if (pa_modargs_get_value_boolean(ma, "autoreconnect", &u->autoreconnect) < 0) { + pa_log("Failed to parse autoreconnect argument"); + goto fail; + } + /* Linked for now, potentially ready for additional parameter */ + u->autonull = u->autoreconnect; if (pa_modargs_get_value_u32(ma, "latency_msec", &u->latency) < 0) { pa_log("Failed to parse latency_msec argument"); @@ -723,7 +873,7 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) { pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); pa_sink_set_rtpoll(u->sink, u->rtpoll); - u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec); + u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec, u->autoreconnect); if (!(u->raop)) { pa_log("Failed to create RAOP client object"); @@ -734,6 +884,7 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) { pa_raop_client_get_frames_per_block(u->raop, &u->block_size); u->block_size *= pa_frame_size(&ss); pa_sink_set_max_request(u->sink, u->block_size); + u->block_usec = pa_bytes_to_usec(u->block_size, &u->sink->sample_spec); pa_raop_client_set_state_callback(u->raop, raop_state_cb, u); @@ -799,6 +950,8 @@ static void userdata_free(struct userdata *u) { if (u->card) pa_card_free(u->card); + if (u->server) + pa_xfree(u->server); pa_xfree(u); } diff --git a/src/modules/rtp/rtsp_client.c b/src/modules/rtp/rtsp_client.c index 34210f9e7..9fd386abe 100644 --- a/src/modules/rtp/rtsp_client.c +++ b/src/modules/rtp/rtsp_client.c @@ -27,6 +27,8 @@ #include <unistd.h> #include <sys/ioctl.h> #include <netinet/in.h> +#include <pulse/rtclock.h> +#include <pulse/timeval.h> #ifdef HAVE_SYS_FILIO_H #include <sys/filio.h> @@ -42,9 +44,12 @@ #include <pulsecore/ioline.h> #include <pulsecore/arpa-inet.h> #include <pulsecore/random.h> +#include <pulsecore/core-rtclock.h> #include "rtsp_client.h" +#define RECONNECT_INTERVAL (5 * PA_USEC_PER_SEC) + struct pa_rtsp_client { pa_mainloop_api *mainloop; char *hostname; @@ -73,9 +78,11 @@ struct pa_rtsp_client { uint32_t cseq; char *session; char *transport; + pa_time_event *reconnect_event; + bool autoreconnect; }; -pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent) { +pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect) { pa_rtsp_client *c; pa_assert(mainloop); @@ -93,12 +100,23 @@ pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostna else c->useragent = "PulseAudio RTSP Client"; + c->autoreconnect = autoreconnect; return c; } +static void free_events(pa_rtsp_client *c) { + pa_assert(c); + + if (c->reconnect_event) { + c->mainloop->time_free(c->reconnect_event); + c->reconnect_event = NULL; + } +} + void pa_rtsp_client_free(pa_rtsp_client *c) { pa_assert(c); + free_events(c); if (c->sc) pa_socket_client_unref(c->sc); @@ -293,6 +311,13 @@ static void line_callback(pa_ioline *line, const char *s, void *userdata) { pa_xfree(s2); } +static void reconnect_cb(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) { + if (userdata) { + pa_rtsp_client *c = userdata; + pa_rtsp_connect(c); + } +} + static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) { pa_rtsp_client *c = userdata; union { @@ -310,7 +335,18 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata c->sc = NULL; if (!io) { - pa_log("Connection failed: %s", pa_cstrerror(errno)); + if (c->autoreconnect) { + struct timeval tv; + + pa_log_warn("Connection to server %s:%d failed: %s - will try later", c->hostname, c->port, pa_cstrerror(errno)); + + if (!c->reconnect_event) + c->reconnect_event = c->mainloop->time_new(c->mainloop, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true), reconnect_cb, c); + else + c->mainloop->time_restart(c->reconnect_event, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true)); + } else { + pa_log("Connection to server %s:%d failed: %s", c->hostname, c->port, pa_cstrerror(errno)); + } return; } pa_assert(!c->ioline); diff --git a/src/modules/rtp/rtsp_client.h b/src/modules/rtp/rtsp_client.h index 4e031d801..259308581 100644 --- a/src/modules/rtp/rtsp_client.h +++ b/src/modules/rtp/rtsp_client.h @@ -54,7 +54,7 @@ typedef enum pa_rtsp_status { typedef void (*pa_rtsp_cb_t)(pa_rtsp_client *c, pa_rtsp_state_t state, pa_rtsp_status_t code, pa_headerlist *headers, void *userdata); -pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent); +pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect); void pa_rtsp_client_free(pa_rtsp_client *c); int pa_rtsp_connect(pa_rtsp_client *c); |