summaryrefslogtreecommitdiff
path: root/gst/rtsp-server/rtsp-client.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtsp-server/rtsp-client.c')
-rw-r--r--gst/rtsp-server/rtsp-client.c168
1 files changed, 153 insertions, 15 deletions
diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c
index 811ae03..70b0e23 100644
--- a/gst/rtsp-server/rtsp-client.c
+++ b/gst/rtsp-server/rtsp-client.c
@@ -190,9 +190,6 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1);
}
-#if 0
- gst_rtsp_connection_send (client->connection, response, &timeout);
-#endif
gst_rtsp_watch_queue_message (client->watch, response);
gst_rtsp_message_unset (response);
}
@@ -297,6 +294,81 @@ no_prepare:
}
static gboolean
+do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client)
+{
+ GstRTSPMessage message = { 0 };
+ guint8 *data;
+ guint size;
+
+ gst_rtsp_message_init_data (&message, channel);
+
+ data = GST_BUFFER_DATA (buffer);
+ size = GST_BUFFER_SIZE (buffer);
+ gst_rtsp_message_take_body (&message, data, size);
+
+ gst_rtsp_watch_queue_message (client->watch, &message);
+
+ gst_rtsp_message_steal_body (&message, &data, &size);
+ gst_rtsp_message_unset (&message);
+
+ return TRUE;
+}
+
+static void
+link_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
+{
+ gst_rtsp_session_stream_set_callbacks (stream, (GstRTSPSendFunc) do_send_data,
+ (GstRTSPSendFunc) do_send_data, client, NULL);
+ client->streams = g_list_prepend (client->streams, stream);
+}
+
+static void
+unlink_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
+{
+ gst_rtsp_session_stream_set_callbacks (stream, NULL,
+ NULL, client, g_object_unref);
+ client->streams = g_list_remove (client->streams, stream);
+}
+
+static void
+unlink_streams (GstRTSPClient *client)
+{
+ GList *walk;
+
+ for (walk = client->streams; walk; walk = g_list_next (walk)) {
+ GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data;
+
+ gst_rtsp_session_stream_set_callbacks (stream, NULL,
+ NULL, NULL, NULL);
+ }
+ g_list_free (client->streams);
+ client->streams = NULL;
+}
+
+static void
+unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media)
+{
+ guint n_streams, i;
+
+ n_streams = gst_rtsp_media_n_streams (media->media);
+ for (i = 0; i < n_streams; i++) {
+ GstRTSPSessionStream *sstream;
+ GstRTSPTransport *tr;
+
+ /* get the stream as configured in the session */
+ sstream = gst_rtsp_session_media_get_stream (media, i);
+ /* get the transport, if there is no transport configured, skip this stream */
+ if (!(tr = sstream->trans.transport))
+ continue;
+
+ if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
+ /* for TCP, unlink the stream from the TCP connection of the client */
+ unlink_stream (client, sstream);
+ }
+ }
+}
+
+static gboolean
handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
{
GstRTSPSessionMedia *media;
@@ -311,7 +383,10 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
if (!media)
goto not_found;
- gst_rtsp_session_media_stop (media);
+ /* unlink the all TCP callbacks */
+ unlink_session_streams (client, media);
+
+ gst_rtsp_session_media_set_state (media, GST_STATE_NULL);
/* unmanage the media in the session, returns false if all media session
* are torn down. */
@@ -360,7 +435,11 @@ handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
media->state != GST_RTSP_STATE_RECORDING)
goto invalid_state;
- gst_rtsp_session_media_pause (media);
+ /* unlink the all TCP callbacks */
+ unlink_session_streams (client, media);
+
+ /* then pause sending */
+ gst_rtsp_session_media_set_state (media, GST_STATE_PAUSED);
/* construct the response now */
code = GST_RTSP_STS_OK;
@@ -420,10 +499,23 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
n_streams = gst_rtsp_media_n_streams (media->media);
for (i = 0; i < n_streams; i++) {
+ GstRTSPSessionStream *sstream;
GstRTSPMediaStream *stream;
+ GstRTSPTransport *tr;
gchar *uristr;
- stream = gst_rtsp_media_get_stream (media->media, i);
+ /* get the stream as configured in the session */
+ sstream = gst_rtsp_session_media_get_stream (media, i);
+ /* get the transport, if there is no transport configured, skip this stream */
+ if (!(tr = sstream->trans.transport))
+ continue;
+
+ if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
+ /* for TCP, link the stream to the TCP connection of the client */
+ link_stream (client, sstream);
+ }
+
+ stream = sstream->media_stream;
g_object_get (G_OBJECT (stream->payloader), "seqnum", &seqnum, NULL);
g_object_get (G_OBJECT (stream->payloader), "timestamp", &timestamp, NULL);
@@ -451,7 +543,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
send_response (client, session, &response);
/* start playing after sending the request */
- gst_rtsp_session_media_play (media);
+ gst_rtsp_session_media_set_state (media, GST_STATE_PLAYING);
media->state = GST_RTSP_STATE_PLAYING;
@@ -594,12 +686,6 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
if (!(stream = gst_rtsp_session_media_get_stream (media, streamid)))
goto no_stream;
- /* setup the server transport from the client transport */
- if (ct->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
- ct->port.min = gst_rtsp_connection_get_readfd (client->connection);
- ct->port.max = gst_rtsp_connection_get_writefd (client->connection);
- }
-
st = gst_rtsp_session_stream_set_transport (stream, ct);
/* serialize the server transport */
@@ -806,6 +892,8 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request)
gst_rtsp_message_dump (request);
#endif
+ g_message ("client %p: received a request", client);
+
gst_rtsp_message_parse_request (request, &method, &uristr, &version);
if (version != GST_RTSP_VERSION_1_0) {
@@ -889,6 +977,54 @@ session_not_found:
}
}
+static void
+handle_data (GstRTSPClient *client, GstRTSPMessage *message)
+{
+ GstRTSPResult res;
+ guint8 channel;
+ GList *walk;
+ guint8 *data;
+ guint size;
+ GstBuffer *buffer;
+
+ /* find the stream for this message */
+ res = gst_rtsp_message_parse_data (message, &channel);
+ if (res != GST_RTSP_OK)
+ return;
+
+ gst_rtsp_message_steal_body (message, &data, &size);
+
+ buffer = gst_buffer_new ();
+ GST_BUFFER_DATA (buffer) = data;
+ GST_BUFFER_MALLOCDATA (buffer) = data;
+ GST_BUFFER_SIZE (buffer) = size;
+
+ for (walk = client->streams; walk; walk = g_list_next (walk)) {
+ GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data;
+ GstRTSPMediaStream *mstream;
+ GstRTSPTransport *tr;
+
+ /* get the transport, if there is no transport configured, skip this stream */
+ if (!(tr = stream->trans.transport))
+ continue;
+
+ /* we also need a media stream */
+ if (!(mstream = stream->media_stream))
+ continue;
+
+ /* check for TCP transport */
+ if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
+ /* dispatch to the stream based on the channel number */
+ if (tr->interleaved.min == channel) {
+ gst_rtsp_media_stream_rtp (mstream, buffer);
+ } else if (tr->interleaved.max == channel) {
+ gst_rtsp_media_stream_rtcp (mstream, buffer);
+ }
+ }
+ }
+ gst_buffer_unref (buffer);
+}
+
/**
* gst_rtsp_client_set_timeout:
* @client: a #GstRTSPClient
@@ -1008,8 +1144,6 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
- g_message ("client %p: received a message", client);
-
switch (message->type) {
case GST_RTSP_MESSAGE_REQUEST:
handle_request (client, message);
@@ -1017,6 +1151,7 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
case GST_RTSP_MESSAGE_RESPONSE:
break;
case GST_RTSP_MESSAGE_DATA:
+ handle_data (client, message);
break;
default:
break;
@@ -1048,6 +1183,9 @@ closed (GstRTSPWatch *watch, gpointer user_data)
g_mutex_unlock (tunnels_lock);
}
+ /* remove all streams that are streaming over this client connection */
+ unlink_streams (client);
+
return GST_RTSP_OK;
}