diff options
Diffstat (limited to 'gst/rtsp-server/rtsp-client.c')
-rw-r--r-- | gst/rtsp-server/rtsp-client.c | 168 |
1 files changed, 153 insertions, 15 deletions
diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index 811ae03..70b0e23 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -190,9 +190,6 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1); } -#if 0 - gst_rtsp_connection_send (client->connection, response, &timeout); -#endif gst_rtsp_watch_queue_message (client->watch, response); gst_rtsp_message_unset (response); } @@ -297,6 +294,81 @@ no_prepare: } static gboolean +do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client) +{ + GstRTSPMessage message = { 0 }; + guint8 *data; + guint size; + + gst_rtsp_message_init_data (&message, channel); + + data = GST_BUFFER_DATA (buffer); + size = GST_BUFFER_SIZE (buffer); + gst_rtsp_message_take_body (&message, data, size); + + gst_rtsp_watch_queue_message (client->watch, &message); + + gst_rtsp_message_steal_body (&message, &data, &size); + gst_rtsp_message_unset (&message); + + return TRUE; +} + +static void +link_stream (GstRTSPClient *client, GstRTSPSessionStream *stream) +{ + gst_rtsp_session_stream_set_callbacks (stream, (GstRTSPSendFunc) do_send_data, + (GstRTSPSendFunc) do_send_data, client, NULL); + client->streams = g_list_prepend (client->streams, stream); +} + +static void +unlink_stream (GstRTSPClient *client, GstRTSPSessionStream *stream) +{ + gst_rtsp_session_stream_set_callbacks (stream, NULL, + NULL, client, g_object_unref); + client->streams = g_list_remove (client->streams, stream); +} + +static void +unlink_streams (GstRTSPClient *client) +{ + GList *walk; + + for (walk = client->streams; walk; walk = g_list_next (walk)) { + GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data; + + gst_rtsp_session_stream_set_callbacks (stream, NULL, + NULL, NULL, NULL); + } + g_list_free (client->streams); + client->streams = NULL; +} + +static void +unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media) +{ + guint n_streams, i; + + n_streams = gst_rtsp_media_n_streams (media->media); + for (i = 0; i < n_streams; i++) { + GstRTSPSessionStream *sstream; + GstRTSPTransport *tr; + + /* get the stream as configured in the session */ + sstream = gst_rtsp_session_media_get_stream (media, i); + /* get the transport, if there is no transport configured, skip this stream */ + if (!(tr = sstream->trans.transport)) + continue; + + if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { + /* for TCP, unlink the stream from the TCP connection of the client */ + unlink_stream (client, sstream); + } + } +} + +static gboolean handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request) { GstRTSPSessionMedia *media; @@ -311,7 +383,10 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession if (!media) goto not_found; - gst_rtsp_session_media_stop (media); + /* unlink the all TCP callbacks */ + unlink_session_streams (client, media); + + gst_rtsp_session_media_set_state (media, GST_STATE_NULL); /* unmanage the media in the session, returns false if all media session * are torn down. */ @@ -360,7 +435,11 @@ handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se media->state != GST_RTSP_STATE_RECORDING) goto invalid_state; - gst_rtsp_session_media_pause (media); + /* unlink the all TCP callbacks */ + unlink_session_streams (client, media); + + /* then pause sending */ + gst_rtsp_session_media_set_state (media, GST_STATE_PAUSED); /* construct the response now */ code = GST_RTSP_STS_OK; @@ -420,10 +499,23 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses n_streams = gst_rtsp_media_n_streams (media->media); for (i = 0; i < n_streams; i++) { + GstRTSPSessionStream *sstream; GstRTSPMediaStream *stream; + GstRTSPTransport *tr; gchar *uristr; - stream = gst_rtsp_media_get_stream (media->media, i); + /* get the stream as configured in the session */ + sstream = gst_rtsp_session_media_get_stream (media, i); + /* get the transport, if there is no transport configured, skip this stream */ + if (!(tr = sstream->trans.transport)) + continue; + + if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { + /* for TCP, link the stream to the TCP connection of the client */ + link_stream (client, sstream); + } + + stream = sstream->media_stream; g_object_get (G_OBJECT (stream->payloader), "seqnum", &seqnum, NULL); g_object_get (G_OBJECT (stream->payloader), "timestamp", ×tamp, NULL); @@ -451,7 +543,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses send_response (client, session, &response); /* start playing after sending the request */ - gst_rtsp_session_media_play (media); + gst_rtsp_session_media_set_state (media, GST_STATE_PLAYING); media->state = GST_RTSP_STATE_PLAYING; @@ -594,12 +686,6 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se if (!(stream = gst_rtsp_session_media_get_stream (media, streamid))) goto no_stream; - /* setup the server transport from the client transport */ - if (ct->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { - ct->port.min = gst_rtsp_connection_get_readfd (client->connection); - ct->port.max = gst_rtsp_connection_get_writefd (client->connection); - } - st = gst_rtsp_session_stream_set_transport (stream, ct); /* serialize the server transport */ @@ -806,6 +892,8 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request) gst_rtsp_message_dump (request); #endif + g_message ("client %p: received a request", client); + gst_rtsp_message_parse_request (request, &method, &uristr, &version); if (version != GST_RTSP_VERSION_1_0) { @@ -889,6 +977,54 @@ session_not_found: } } +static void +handle_data (GstRTSPClient *client, GstRTSPMessage *message) +{ + GstRTSPResult res; + guint8 channel; + GList *walk; + guint8 *data; + guint size; + GstBuffer *buffer; + + /* find the stream for this message */ + res = gst_rtsp_message_parse_data (message, &channel); + if (res != GST_RTSP_OK) + return; + + gst_rtsp_message_steal_body (message, &data, &size); + + buffer = gst_buffer_new (); + GST_BUFFER_DATA (buffer) = data; + GST_BUFFER_MALLOCDATA (buffer) = data; + GST_BUFFER_SIZE (buffer) = size; + + for (walk = client->streams; walk; walk = g_list_next (walk)) { + GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data; + GstRTSPMediaStream *mstream; + GstRTSPTransport *tr; + + /* get the transport, if there is no transport configured, skip this stream */ + if (!(tr = stream->trans.transport)) + continue; + + /* we also need a media stream */ + if (!(mstream = stream->media_stream)) + continue; + + /* check for TCP transport */ + if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { + /* dispatch to the stream based on the channel number */ + if (tr->interleaved.min == channel) { + gst_rtsp_media_stream_rtp (mstream, buffer); + } else if (tr->interleaved.max == channel) { + gst_rtsp_media_stream_rtcp (mstream, buffer); + } + } + } + gst_buffer_unref (buffer); +} + /** * gst_rtsp_client_set_timeout: * @client: a #GstRTSPClient @@ -1008,8 +1144,6 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da { GstRTSPClient *client = GST_RTSP_CLIENT (user_data); - g_message ("client %p: received a message", client); - switch (message->type) { case GST_RTSP_MESSAGE_REQUEST: handle_request (client, message); @@ -1017,6 +1151,7 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da case GST_RTSP_MESSAGE_RESPONSE: break; case GST_RTSP_MESSAGE_DATA: + handle_data (client, message); break; default: break; @@ -1048,6 +1183,9 @@ closed (GstRTSPWatch *watch, gpointer user_data) g_mutex_unlock (tunnels_lock); } + /* remove all streams that are streaming over this client connection */ + unlink_streams (client); + return GST_RTSP_OK; } |