diff options
Diffstat (limited to 'gst/rtsp-server/rtsp-media.c')
-rw-r--r-- | gst/rtsp-server/rtsp-media.c | 339 |
1 files changed, 223 insertions, 116 deletions
diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index 6284612..b75d386 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -69,7 +69,6 @@ static void gst_rtsp_media_init (GstRTSPMedia * media) { media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *)); - media->complete = FALSE; media->is_live = FALSE; media->buffering = FALSE; } @@ -251,6 +250,46 @@ gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx) return res; } +/** + * gst_rtsp_media_stream_rtp: + * @stream: a #GstRTSPMediaStream + * @buffer: a #GstBuffer + * + * Handle an RTP buffer for the stream. This method is usually called when a + * message has been received from a client using the TCP transport. + * + * Returns: a GstFlowReturn. + */ +GstFlowReturn +gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer) +{ + GstFlowReturn ret; + + g_signal_emit_by_name (stream->appsrc[0], "push-buffer", buffer, &ret); + + return ret; +} + +/** + * gst_rtsp_media_stream_rtcp: + * @stream: a #GstRTSPMediaStream + * @buffer: a #GstBuffer + * + * Handle an RTCP buffer for the stream. This method is usually called when a + * message has been received from a client using the TCP transport. + * + * Returns: a GstFlowReturn. + */ +GstFlowReturn +gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer) +{ + GstFlowReturn ret; + + g_signal_emit_by_name (stream->appsrc[1], "push-buffer", buffer, &ret); + + return GST_FLOW_ERROR; +} + /* Allocate the udp ports and sockets */ static gboolean alloc_udp_ports (GstRTSPMediaStream * stream) @@ -461,19 +500,67 @@ on_timeout (GObject *session, GObject *source, GstRTSPMedia *media) g_message ("%p: source %p timeout", media, source); } +static void +handle_new_buffer (GstElement *sink, GstRTSPMediaStream *stream) +{ + GList *walk; + GstBuffer *buffer; + + g_signal_emit_by_name (sink, "pull-buffer", &buffer); + if (!buffer) + return; + + for (walk = stream->transports; walk; walk = g_list_next (walk)) { + GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data; + + if (sink == stream->appsink[0]) { + if (tr->send_rtp) + tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data); + } + else { + if (tr->send_rtcp) + tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data); + } + } + gst_buffer_unref (buffer); +} + /* prepare the pipeline objects to handle @stream in @media */ static gboolean setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) { gchar *name; - GstPad *pad; + GstPad *pad, *teepad, *selpad; + GstPadLinkReturn ret; + gint i; + GstElement *tee, *selector; - alloc_udp_ports (stream); + /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2 + * for sending RTP/RTCP. The sender and receiver ports are shared between the + * elements */ + if (!alloc_udp_ports (stream)) + return FALSE; - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[0]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[1]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[0]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[1]); + /* add the ports to the pipeline */ + for (i = 0; i < 2; i++) { + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]); + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]); + } + + /* create elements for the TCP transfer */ + for (i = 0; i < 2; i++) { + stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL); + stream->appsink[i] = gst_element_factory_make ("appsink", NULL); + g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL); + g_object_set (stream->appsink[i], "emit-signals", TRUE, NULL); + g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]); + gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]); + } + g_signal_connect (stream->appsink[0], "new-buffer", + (GCallback) handle_new_buffer, stream); + g_signal_connect (stream->appsink[1], "new-buffer", + (GCallback) handle_new_buffer, stream); /* hook up the stream to the RTP session elements. */ name = g_strdup_printf ("send_rtp_sink_%d", idx); @@ -505,19 +592,73 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) media); /* link the RTP pad to the session manager */ - gst_pad_link (stream->srcpad, stream->send_rtp_sink); + ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink); + if (ret != GST_PAD_LINK_OK) + goto link_failed; - /* link udp elements */ - pad = gst_element_get_static_pad (stream->udpsink[0], "sink"); + /* make tee for RTP and link to stream */ + tee = gst_element_factory_make ("tee", NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), tee); + + pad = gst_element_get_static_pad (tee, "sink"); gst_pad_link (stream->send_rtp_src, pad); gst_object_unref (pad); - pad = gst_element_get_static_pad (stream->udpsink[1], "sink"); + + /* link RTP sink, we're pretty sure this will work. */ + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->udpsink[0], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->appsink[0], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + /* make tee for RTCP */ + tee = gst_element_factory_make ("tee", NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), tee); + + pad = gst_element_get_static_pad (tee, "sink"); gst_pad_link (stream->send_rtcp_src, pad); gst_object_unref (pad); - pad = gst_element_get_static_pad (stream->udpsrc[1], "src"); + + /* link RTCP elements */ + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->udpsink[1], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + teepad = gst_element_get_request_pad (tee, "src%d"); + pad = gst_element_get_static_pad (stream->appsink[1], "sink"); + gst_pad_link (teepad, pad); + gst_object_unref (pad); + gst_object_unref (teepad); + + /* make selector for the RTCP receivers */ + selector = gst_element_factory_make ("input-selector", NULL); + g_object_set (selector, "select-all", TRUE, NULL); + gst_bin_add (GST_BIN_CAST (media->pipeline), selector); + + pad = gst_element_get_static_pad (selector, "src"); gst_pad_link (pad, stream->recv_rtcp_sink); gst_object_unref (pad); + selpad = gst_element_get_request_pad (selector, "sink%d"); + pad = gst_element_get_static_pad (stream->udpsrc[1], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); + + selpad = gst_element_get_request_pad (selector, "sink%d"); + pad = gst_element_get_static_pad (stream->appsrc[1], "src"); + gst_pad_link (pad, selpad); + gst_object_unref (pad); + gst_object_unref (selpad); + /* we set and keep these to playing so that they don't cause NO_PREROLL return * values */ gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING); @@ -532,6 +673,13 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media) stream->prepared = TRUE; return TRUE; + + /* ERRORS */ +link_failed: + { + g_warning ("failed to link stream %d", idx); + return FALSE; + } } static void @@ -648,6 +796,17 @@ default_handle_message (GstRTSPMedia *media, GstMessage *message) g_free (debug); break; } + case GST_MESSAGE_WARNING: + { + GError *gerror; + gchar *debug; + + gst_message_parse_warning (message, &gerror, &debug); + g_warning ("%p: got warning %s (%s)", media, gerror->message, debug); + g_error_free (gerror); + g_free (debug); + break; + } default: g_message ("%p: got message type %s", media, gst_message_type_get_name (type)); break; @@ -678,6 +837,9 @@ bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media) * Prepare @media for streaming. This function will create the pipeline and * other objects to manage the streaming. * + * It will preroll the pipeline and collect vital information about the streams + * such as the duration. + * * Returns: %TRUE on success. */ gboolean @@ -736,19 +898,22 @@ gst_rtsp_media_prepare (GstRTSPMedia *media) g_message ("live media %p", media); media->is_live = TRUE; ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); + if (ret == GST_STATE_CHANGE_FAILURE) + goto state_failed; break; case GST_STATE_CHANGE_FAILURE: - { - unlock_streams (media); goto state_failed; - } } /* now wait for all pads to be prerolled */ ret = gst_element_get_state (media->pipeline, NULL, NULL, -1); + if (ret == GST_STATE_CHANGE_FAILURE) + goto state_failed; /* and back to PAUSED for live pipelines */ ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED); + if (ret == GST_STATE_CHANGE_FAILURE) + goto state_failed; /* collect stats about the media */ collect_media_stats (media); @@ -770,95 +935,58 @@ was_prepared: /* ERRORS */ state_failed: { + g_warning ("failed to preroll pipeline"); + unlock_streams (media); gst_element_set_state (media->pipeline, GST_STATE_NULL); return FALSE; } } /** - * gst_rtsp_media_play: + * gst_rtsp_media_set_state: * @media: a #GstRTSPMedia + * @state: the target state of the media * @transports: a GArray of #GstRTSPMediaTrans pointers * - * Start playing @media for to the transports in @transports. + * Set the state of @media to @state and for the transports in @transports. * * Returns: %TRUE on success. */ gboolean -gst_rtsp_media_play (GstRTSPMedia *media, GArray *transports) +gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports) { gint i; GstStateChangeReturn ret; + gboolean add, remove; g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); g_return_val_if_fail (transports != NULL, FALSE); g_return_val_if_fail (media->prepared, FALSE); - if (media->target_state == GST_STATE_PLAYING) - return TRUE; + /* NULL and READY are the same */ + if (state == GST_STATE_READY) + state = GST_STATE_NULL; - for (i = 0; i < transports->len; i++) { - GstRTSPMediaTrans *tr; - GstRTSPMediaStream *stream; - GstRTSPTransport *trans; - - /* we need a non-NULL entry in the array */ - tr = g_array_index (transports, GstRTSPMediaTrans *, i); - if (tr == NULL) - continue; - - /* we need a transport */ - if (!(trans = tr->transport)) - continue; + if (media->target_state == state) + return TRUE; - /* get the stream and add the destinations */ - stream = gst_rtsp_media_get_stream (media, tr->idx); - switch (trans->lower_transport) { - case GST_RTSP_LOWER_TRANS_UDP: - case GST_RTSP_LOWER_TRANS_UDP_MCAST: - g_message ("adding %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); + add = remove = FALSE; - g_signal_emit_by_name (stream->udpsink[0], "add", trans->destination, trans->client_port.min, NULL); - g_signal_emit_by_name (stream->udpsink[1], "add", trans->destination, trans->client_port.max, NULL); - break; - case GST_RTSP_LOWER_TRANS_TCP: - g_message ("TCP transport not yet implemented"); - break; - default: - g_message ("Unknown transport %d", trans->lower_transport); - break; - } + switch (state) { + case GST_STATE_NULL: + case GST_STATE_PAUSED: + /* we're going from PLAYING to READY or NULL, remove */ + if (media->target_state == GST_STATE_PLAYING) + remove = TRUE; + break; + case GST_STATE_PLAYING: + /* we're going to PLAYING, add */ + add = TRUE; + break; + default: + break; } - g_message ("playing media %p", media); - media->target_state = GST_STATE_PLAYING; - ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); - - return TRUE; -} - -/** - * gst_rtsp_media_pause: - * @media: a #GstRTSPMedia - * @transports: a array of #GstRTSPTransport pointers - * - * Pause playing @media for to the transports in @transports. - * - * Returns: %TRUE on success. - */ -gboolean -gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports) -{ - gint i; - GstStateChangeReturn ret; - - g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - g_return_val_if_fail (transports != NULL, FALSE); - g_return_val_if_fail (media->prepared, FALSE); - - if (media->target_state == GST_STATE_PAUSED) - return TRUE; - for (i = 0; i < transports->len; i++) { GstRTSPMediaTrans *tr; GstRTSPMediaStream *stream; @@ -875,56 +1003,35 @@ gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports) /* get the stream and add the destinations */ stream = gst_rtsp_media_get_stream (media, tr->idx); - switch (trans->lower_transport) { case GST_RTSP_LOWER_TRANS_UDP: case GST_RTSP_LOWER_TRANS_UDP_MCAST: - g_message ("removing %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); - - g_signal_emit_by_name (stream->udpsink[0], "remove", trans->destination, trans->client_port.min, NULL); - g_signal_emit_by_name (stream->udpsink[1], "remove", trans->destination, trans->client_port.max, NULL); + if (add) { + g_message ("adding %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); + g_signal_emit_by_name (stream->udpsink[0], "add", trans->destination, trans->client_port.min, NULL); + g_signal_emit_by_name (stream->udpsink[1], "add", trans->destination, trans->client_port.max, NULL); + } else if (remove) { + g_message ("removing %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max); + g_signal_emit_by_name (stream->udpsink[0], "remove", trans->destination, trans->client_port.min, NULL); + g_signal_emit_by_name (stream->udpsink[1], "remove", trans->destination, trans->client_port.max, NULL); + } break; case GST_RTSP_LOWER_TRANS_TCP: - g_message ("TCP transport not yet implemented"); + if (add) { + stream->transports = g_list_prepend (stream->transports, tr); + } else if (remove) { + stream->transports = g_list_remove (stream->transports, tr); + } break; default: g_message ("Unknown transport %d", trans->lower_transport); break; } } - g_message ("pause media %p", media); - media->target_state = GST_STATE_PAUSED; - ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED); - - return TRUE; -} - -/** - * gst_rtsp_media_stream_stop: - * @media: a #GstRTSPMedia - * @transports: a GArray of #GstRTSPMediaTrans pointers - * - * Stop playing @media for to the transports in @transports. - * - * Returns: %TRUE on success. - */ -gboolean -gst_rtsp_media_stop (GstRTSPMedia *media, GArray *transports) -{ - GstStateChangeReturn ret; - - g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - g_return_val_if_fail (transports != NULL, FALSE); - g_return_val_if_fail (media->prepared, FALSE); - - if (media->target_state == GST_STATE_NULL) - return TRUE; - - gst_rtsp_media_pause (media, transports); - g_message ("stop media %p", media); - media->target_state = GST_STATE_NULL; - ret = gst_element_set_state (media->pipeline, GST_STATE_NULL); + g_message ("state %s media %p", gst_element_state_get_name (state), media); + media->target_state = state; + ret = gst_element_set_state (media->pipeline, state); return TRUE; } |