diff options
author | Wim Taymans <wtaymans@redhat.com> | 2016-07-21 18:38:24 +0200 |
---|---|---|
committer | Wim Taymans <wtaymans@redhat.com> | 2016-07-21 18:38:24 +0200 |
commit | af3de36416d01db436293913c261ffe1d28d91cd (patch) | |
tree | cb5b81ec6dd863ed9e0ea3ceecd689691409cdcf | |
parent | 31041a4e16c718475223cf07a7a39716e30659fa (diff) |
work on stream negotiation and start
Add more buffer types to add and remove memory shared memory between the
server and client. We would like to send buffers only once and then
simply reference them by index.
Do format negotiation and stream start with a START message.
-rw-r--r-- | pinos/Makefile.am | 1 | ||||
-rw-r--r-- | pinos/client/buffer.c | 220 | ||||
-rw-r--r-- | pinos/client/buffer.h | 129 | ||||
-rw-r--r-- | pinos/client/fdmanager.c | 37 | ||||
-rw-r--r-- | pinos/client/fdmanager.h | 3 | ||||
-rw-r--r-- | pinos/client/pinos.h | 1 | ||||
-rw-r--r-- | pinos/client/stream.c | 139 | ||||
-rw-r--r-- | pinos/dbus/org.pinos.xml | 18 | ||||
-rw-r--r-- | pinos/gst/gstpinosdepay.c | 52 | ||||
-rw-r--r-- | pinos/gst/gstpinosdepay.h | 1 | ||||
-rw-r--r-- | pinos/gst/gstpinospay.c | 36 | ||||
-rw-r--r-- | pinos/gst/gstpinosportsink.c | 38 | ||||
-rw-r--r-- | pinos/gst/gstpinosportsrc.c | 67 | ||||
-rw-r--r-- | pinos/gst/gstpinosportsrc.h | 1 | ||||
-rw-r--r-- | pinos/gst/gstpinossink.c | 55 | ||||
-rw-r--r-- | pinos/gst/gstpinossink.h | 4 | ||||
-rw-r--r-- | pinos/gst/gstpinossocketsink.c | 46 | ||||
-rw-r--r-- | pinos/gst/gstpinossrc.c | 80 | ||||
-rw-r--r-- | pinos/gst/gstpinossrc.h | 1 | ||||
-rw-r--r-- | pinos/server/channel.c | 186 | ||||
-rw-r--r-- | pinos/server/link.c | 401 | ||||
-rw-r--r-- | pinos/server/link.h | 75 | ||||
-rw-r--r-- | pinos/server/port.c | 2 |
23 files changed, 1245 insertions, 348 deletions
diff --git a/pinos/Makefile.am b/pinos/Makefile.am index b1f1200e..6c9eca56 100644 --- a/pinos/Makefile.am +++ b/pinos/Makefile.am @@ -208,6 +208,7 @@ libpinoscore_@PINOS_MAJORMINOR@_la_SOURCES = \ server/channel.c server/channel.h \ server/client.c server/client.h \ server/daemon.c server/daemon.h \ + server/link.c server/link.h \ server/node.c server/node.h \ server/port.c server/port.h \ server/node-factory.c server/node-factory.h \ diff --git a/pinos/client/buffer.c b/pinos/client/buffer.c index 63f54f79..ec7f088f 100644 --- a/pinos/client/buffer.c +++ b/pinos/client/buffer.c @@ -459,8 +459,8 @@ pinos_buffer_builder_init_full (PinosBufferBuilder *builder, sb->buf.max_size = sizeof (PinosStackHeader) + 128; sb->buf.data = g_malloc (sb->buf.max_size); sb->buf.free_data = sb->buf.data; -// g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, -// builder, max_data, sb->buf.max_size); + g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, + builder, max_data, sb->buf.max_size); } else { sb->buf.max_size = max_data; sb->buf.data = data; @@ -581,8 +581,8 @@ pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, if (sb->buf.n_fds >= sb->buf.max_fds) { gint new_size = sb->buf.max_fds + 8; -// g_warning ("builder %p: realloc buffer fds %d -> %d", -// builder, sb->buf.max_fds, new_size); + g_warning ("builder %p: realloc buffer fds %d -> %d", + builder, sb->buf.max_fds, new_size); sb->buf.max_fds = new_size; sb->buf.free_fds = g_realloc (sb->buf.free_fds, new_size * sizeof (int)); sb->buf.fds = sb->buf.free_fds; @@ -599,8 +599,8 @@ builder_ensure_size (struct stack_builder *sb, gsize size) { if (sb->buf.size + size > sb->buf.max_size) { gsize new_size = sb->buf.size + MAX (size, 1024); -// g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, -// sb, sb->buf.max_size, new_size); + g_warning ("builder %p: realloc buffer memory %"G_GSIZE_FORMAT" -> %"G_GSIZE_FORMAT, + sb, sb->buf.max_size, new_size); sb->buf.max_size = new_size; sb->buf.free_data = g_realloc (sb->buf.free_data, new_size); sb->sh = sb->buf.data = sb->buf.free_data; @@ -634,20 +634,144 @@ builder_add_packet (struct stack_builder *sb, PinosPacketType type, gsize size) return p; } +/** + * pinos_buffer_builder_add_empty: + * @builder: a #PinosBufferBuilder + * @type: a #PinosPacketType + * + * Add an empty packet of @type. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_buffer_builder_add_empty (PinosBufferBuilder *builder, + PinosPacketType type) +{ + struct stack_builder *sb = PPSB (builder); + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + builder_add_packet (sb, type, 0); + + return TRUE; +} + +/** + * pinos_buffer_iter_parse_add_mem: + * @iter: a #PinosBufferIter + * @payload: a #PinosPacketAddMem + * + * Get the #PinosPacketAddMem. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_ADD_MEM + * + * Returns: %TRUE if @payload contains valid data. + */ +gboolean +pinos_buffer_iter_parse_add_mem (PinosBufferIter *iter, + PinosPacketAddMem *payload) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_ADD_MEM, FALSE); + + if (si->size < sizeof (PinosPacketAddMem)) + return FALSE; + + memcpy (payload, si->data, sizeof (*payload)); + + return TRUE; +} + +/** + * pinos_buffer_builder_add_add_mem: + * @builder: a #PinosBufferBuilder + * @payload: a #PinosPacketAddMem + * + * Add a #PINOS_PACKET_TYPE_ADD_MEM to @builder with data from @payload. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_buffer_builder_add_add_mem (PinosBufferBuilder *builder, + PinosPacketAddMem *payload) +{ + struct stack_builder *sb = PPSB (builder); + PinosPacketAddMem *p; + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + p = builder_add_packet (sb, PINOS_PACKET_TYPE_ADD_MEM, sizeof (PinosPacketAddMem)); + memcpy (p, payload, sizeof (*payload)); + + return TRUE; +} + +/** + * pinos_buffer_iter_parse_remove_mem: + * @iter: a #PinosBufferIter + * @payload: a #PinosPacketRemoveMem + * + * Get the #PinosPacketRemoveMem. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_REMOVE_MEM + * + * Returns: %TRUE if @payload contains valid data. + */ +gboolean +pinos_buffer_iter_parse_remove_mem (PinosBufferIter *iter, + PinosPacketRemoveMem *payload) +{ + struct stack_iter *si = PPSI (iter); + + g_return_val_if_fail (is_valid_iter (iter), FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_REMOVE_MEM, FALSE); + + if (si->size < sizeof (PinosPacketRemoveMem)) + return FALSE; + + memcpy (payload, si->data, sizeof (*payload)); + + return TRUE; +} + +/** + * pinos_buffer_builder_add_remove_mem: + * @builder: a #PinosBufferBuilder + * @payload: a #PinosPacketRemoveMem + * + * Add a #PINOS_PACKET_TYPE_REMOVE_MEM to @builder with data from @payload. + * + * Returns: %TRUE on success. + */ +gboolean +pinos_buffer_builder_add_remove_mem (PinosBufferBuilder *builder, + PinosPacketRemoveMem *payload) +{ + struct stack_builder *sb = PPSB (builder); + PinosPacketRemoveMem *p; + + g_return_val_if_fail (is_valid_builder (builder), FALSE); + + p = builder_add_packet (sb, PINOS_PACKET_TYPE_REMOVE_MEM, sizeof (PinosPacketRemoveMem)); + memcpy (p, payload, sizeof (*payload)); + + return TRUE; +} + /* header packets */ /** - * pinos_buffer_iter_get_header: + * pinos_buffer_iter_parse_header: * @iter: a #PinosBufferIter - * @header: a #PinosPacketHeader + * @payload: a #PinosPacketHeader * * Get the #PinosPacketHeader. @iter must be positioned on a packet of * type #PINOS_PACKET_TYPE_HEADER * - * Returns: %TRUE if @header contains valid data. + * Returns: %TRUE if @payload contains valid data. */ gboolean pinos_buffer_iter_parse_header (PinosBufferIter *iter, - PinosPacketHeader *header) + PinosPacketHeader *payload) { struct stack_iter *si = PPSI (iter); @@ -657,7 +781,7 @@ pinos_buffer_iter_parse_header (PinosBufferIter *iter, if (si->size < sizeof (PinosPacketHeader)) return FALSE; - memcpy (header, si->data, sizeof (*header)); + memcpy (payload, si->data, sizeof (*payload)); return TRUE; } @@ -665,48 +789,48 @@ pinos_buffer_iter_parse_header (PinosBufferIter *iter, /** * pinos_buffer_builder_add_header: * @builder: a #PinosBufferBuilder - * @header: a #PinosPacketHeader + * @payload: a #PinosPacketHeader * - * Add a #PINOS_PACKET_TYPE_HEADER to @builder with data from @header. + * Add a #PINOS_PACKET_TYPE_HEADER to @builder with data from @payload. * * Returns: %TRUE on success. */ gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *builder, - PinosPacketHeader *header) + PinosPacketHeader *payload) { struct stack_builder *sb = PPSB (builder); - PinosPacketHeader *h; + PinosPacketHeader *p; g_return_val_if_fail (is_valid_builder (builder), FALSE); - h = builder_add_packet (sb, PINOS_PACKET_TYPE_HEADER, sizeof (PinosPacketHeader)); - memcpy (h, header, sizeof (*header)); + p = builder_add_packet (sb, PINOS_PACKET_TYPE_HEADER, sizeof (PinosPacketHeader)); + memcpy (p, payload, sizeof (*payload)); return TRUE; } -/* fd-payload packets */ +/* process-mem packets */ /** - * pinos_buffer_iter_get_fd_payload: + * pinos_buffer_iter_parse_process_mem: * @iter: a #PinosBufferIter - * @payload: a #PinosPacketFDPayload + * @payload: a #PinosPacketProcessMem * - * Get the #PinosPacketFDPayload. @iter must be positioned on a packet of - * type #PINOS_PACKET_TYPE_FD_PAYLOAD + * Get the #PinosPacketProcessMem. @iter must be positioned on a packet of + * type #PINOS_PACKET_TYPE_PROCESS_MEM * * Returns: %TRUE if @payload contains valid data. */ gboolean -pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, - PinosPacketFDPayload *payload) +pinos_buffer_iter_parse_process_mem (PinosBufferIter *iter, + PinosPacketProcessMem *payload) { struct stack_iter *si = PPSI (iter); g_return_val_if_fail (is_valid_iter (iter), FALSE); - g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_FD_PAYLOAD, FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_PROCESS_MEM, FALSE); - if (si->size < sizeof (PinosPacketFDPayload)) + if (si->size < sizeof (PinosPacketProcessMem)) return FALSE; memcpy (payload, si->data, sizeof (*payload)); @@ -715,49 +839,49 @@ pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, } /** - * pinos_buffer_builder_add_fd_payload: + * pinos_buffer_builder_add_process_mem: * @builder: a #PinosBufferBuilder - * @payload: a #PinosPacketFDPayload + * @payload: a #PinosPacketProcessMem * - * Add a #PINOS_PACKET_TYPE_FD_PAYLOAD to @builder with data from @payload. + * Add a #PINOS_PACKET_TYPE_PROCESS_MEM to @builder with data from @payload. * * Returns: %TRUE on success. */ gboolean -pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder, - PinosPacketFDPayload *payload) +pinos_buffer_builder_add_process_mem (PinosBufferBuilder *builder, + PinosPacketProcessMem *payload) { struct stack_builder *sb = PPSB (builder); - PinosPacketFDPayload *p; + PinosPacketProcessMem *p; g_return_val_if_fail (is_valid_builder (builder), FALSE); g_return_val_if_fail (payload->size > 0, FALSE); - p = builder_add_packet (sb, PINOS_PACKET_TYPE_FD_PAYLOAD, sizeof (PinosPacketFDPayload)); + p = builder_add_packet (sb, PINOS_PACKET_TYPE_PROCESS_MEM, sizeof (PinosPacketProcessMem)); memcpy (p, payload, sizeof (*payload)); return TRUE; } /** - * pinos_buffer_iter_parse_release_fd_payload: + * pinos_buffer_iter_parse_reuse_mem: * @iter: a #PinosBufferIter - * @payload: a #PinosPacketReleaseFDPayload + * @payload: a #PinosPacketReuseMem * - * Parse a #PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD packet from @iter into @payload. + * Parse a #PINOS_PACKET_TYPE_REUSE_MEM packet from @iter into @payload. * * Returns: %TRUE on success */ gboolean -pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, - PinosPacketReleaseFDPayload *payload) +pinos_buffer_iter_parse_reuse_mem (PinosBufferIter *iter, + PinosPacketReuseMem *payload) { struct stack_iter *si = PPSI (iter); g_return_val_if_fail (is_valid_iter (iter), FALSE); - g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD, FALSE); + g_return_val_if_fail (si->type == PINOS_PACKET_TYPE_REUSE_MEM, FALSE); - if (si->size < sizeof (PinosPacketReleaseFDPayload)) + if (si->size < sizeof (PinosPacketReuseMem)) return FALSE; memcpy (payload, si->data, sizeof (*payload)); @@ -766,26 +890,26 @@ pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, } /** - * pinos_buffer_builder_add_release_fd_payload: + * pinos_buffer_builder_add_reuse_mem: * @builder: a #PinosBufferBuilder - * @payload: a #PinosPacketReleaseFDPayload + * @payload: a #PinosPacketReuseMem * - * Add a #PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD payload in @payload to @builder. + * Add a #PINOS_PACKET_TYPE_REUSE_MEM payload in @payload to @builder. * * Returns: %TRUE on success */ gboolean -pinos_buffer_builder_add_release_fd_payload (PinosBufferBuilder *builder, - PinosPacketReleaseFDPayload *payload) +pinos_buffer_builder_add_reuse_mem (PinosBufferBuilder *builder, + PinosPacketReuseMem *payload) { struct stack_builder *sb = PPSB (builder); - PinosPacketReleaseFDPayload *p; + PinosPacketReuseMem *p; g_return_val_if_fail (is_valid_builder (builder), FALSE); p = builder_add_packet (sb, - PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD, - sizeof (PinosPacketReleaseFDPayload)); + PINOS_PACKET_TYPE_REUSE_MEM, + sizeof (PinosPacketReuseMem)); memcpy (p, payload, sizeof (*payload)); return TRUE; diff --git a/pinos/client/buffer.h b/pinos/client/buffer.h index 3750c780..b035d305 100644 --- a/pinos/client/buffer.h +++ b/pinos/client/buffer.h @@ -75,11 +75,16 @@ gint * pinos_buffer_steal_fds (PinosBuffer *buffer, * @PINOS_PACKET_TYPE_INVALID: invalid packet type, ignore * @PINOS_PACKET_TYPE_CONTINUATION: continuation packet, used internally to send * commands using a shared memory region. + * @PINOS_PACKET_TYPE_REGISTER_MEM: register memory region + * @PINOS_PACKET_TYPE_RELEASE_MEM: release memory region + * @PINOS_PACKET_TYPE_START: start transfer + * @PINOS_PACKET_TYPE_STOP: stop transfer + * * @PINOS_PACKET_TYPE_HEADER: common packet header - * @PINOS_PACKET_TYPE_FD_PAYLOAD: packet contains fd-payload. An fd-payload contains - * the media data as a file descriptor - * @PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: packet contains release fd-payload. Notifies - * that a previously received fd-payload is no longer in use. + * @PINOS_PACKET_TYPE_PROCESS_MEM: packet contains mem-payload. An mem-payload contains + * the media data as the index of a shared memory region + * @PINOS_PACKET_TYPE_REUSE_MEM: when a memory region has been consumed and is ready to + * be reused. * @PINOS_PACKET_TYPE_FORMAT_CHANGE: a format change. * @PINOS_PACKET_TYPE_PROPERTY_CHANGE: one or more property changes. * @PINOS_PACKET_TYPE_REFRESH_REQUEST: ask for a new keyframe @@ -89,13 +94,21 @@ gint * pinos_buffer_steal_fds (PinosBuffer *buffer, typedef enum { PINOS_PACKET_TYPE_INVALID = 0, - PINOS_PACKET_TYPE_CONTINUATION = 1, - PINOS_PACKET_TYPE_HEADER = 2, - PINOS_PACKET_TYPE_FD_PAYLOAD = 3, - PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD = 4, - PINOS_PACKET_TYPE_FORMAT_CHANGE = 5, - PINOS_PACKET_TYPE_PROPERTY_CHANGE = 6, - PINOS_PACKET_TYPE_REFRESH_REQUEST = 7, + PINOS_PACKET_TYPE_CONTINUATION, + PINOS_PACKET_TYPE_ADD_MEM, + PINOS_PACKET_TYPE_REMOVE_MEM, + PINOS_PACKET_TYPE_START, + PINOS_PACKET_TYPE_STREAMING, + PINOS_PACKET_TYPE_STOP, + PINOS_PACKET_TYPE_STOPPED, + PINOS_PACKET_TYPE_DRAIN, + PINOS_PACKET_TYPE_DRAINED, + PINOS_PACKET_TYPE_HEADER, + PINOS_PACKET_TYPE_PROCESS_MEM, + PINOS_PACKET_TYPE_REUSE_MEM, + PINOS_PACKET_TYPE_FORMAT_CHANGE, + PINOS_PACKET_TYPE_PROPERTY_CHANGE, + PINOS_PACKET_TYPE_REFRESH_REQUEST, } PinosPacketType; @@ -139,8 +152,51 @@ void pinos_buffer_builder_clear (PinosBufferBuilder *builder) void pinos_buffer_builder_end (PinosBufferBuilder *builder, PinosBuffer *buffer); +gboolean pinos_buffer_builder_add_empty (PinosBufferBuilder *builder, + PinosPacketType type); + gint pinos_buffer_builder_add_fd (PinosBufferBuilder *builder, int fd); +/* add-mem packets */ +/** + * PinosPacketAddMem: + * @id: the unique id of this memory block + * @type: the memory block type + * @fd_index: the index of the fd with the data + * @offset: the offset of the data + * @size: the size of the data + * + * A Packet that contains a memory block used for data transfer. + */ +typedef struct { + guint32 id; + guint32 type; + gint32 fd_index; + guint64 offset; + guint64 size; +} PinosPacketAddMem; + +gboolean pinos_buffer_iter_parse_add_mem (PinosBufferIter *iter, + PinosPacketAddMem *payload); +gboolean pinos_buffer_builder_add_add_mem (PinosBufferBuilder *builder, + PinosPacketAddMem *payload); + +/* remove-mem packets */ +/** + * PinosPacketRemoveMem: + * @id: the unique id of the memory block + * + * Remove a memory block. + */ +typedef struct { + guint32 id; +} PinosPacketRemoveMem; + +gboolean pinos_buffer_iter_parse_remove_mem (PinosBufferIter *iter, + PinosPacketRemoveMem *payload); +gboolean pinos_buffer_builder_add_remove_mem (PinosBufferBuilder *builder, + PinosPacketRemoveMem *payload); + /* header packets */ /** * PinosPacketHeader @@ -159,15 +215,15 @@ typedef struct { } PinosPacketHeader; gboolean pinos_buffer_iter_parse_header (PinosBufferIter *iter, - PinosPacketHeader *header); + PinosPacketHeader *payload); gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *builder, - PinosPacketHeader *header); + PinosPacketHeader *payload); + -/* fd-payload packets */ +/* process-mem packets */ /** - * PinosPacketFDPayload: - * @id: the unique id of this payload - * @fd_index: the index of the fd with the data + * PinosPacketProcessMem: + * @id: the mem index to process * @offset: the offset of the data * @size: the size of the data * @@ -176,31 +232,32 @@ gboolean pinos_buffer_builder_add_header (PinosBufferBuilder *buil */ typedef struct { guint32 id; - gint32 fd_index; guint64 offset; guint64 size; -} PinosPacketFDPayload; +} PinosPacketProcessMem; -gboolean pinos_buffer_iter_parse_fd_payload (PinosBufferIter *iter, - PinosPacketFDPayload *payload); -gboolean pinos_buffer_builder_add_fd_payload (PinosBufferBuilder *builder, - PinosPacketFDPayload *payload); +gboolean pinos_buffer_iter_parse_process_mem (PinosBufferIter *iter, + PinosPacketProcessMem *payload); +gboolean pinos_buffer_builder_add_process_mem (PinosBufferBuilder *builder, + PinosPacketProcessMem *payload); -/* release fd-payload packets */ +/* reuse-mem packets */ /** - * PinosPacketReleaseFDPayload: - * @id: the unique id of the fd-payload to release + * PinosPacketReuseMem: + * @id: the unique id of the memory block to reuse * * Release the payload with @id */ typedef struct { guint32 id; -} PinosPacketReleaseFDPayload; + guint64 offset; + guint64 size; +} PinosPacketReuseMem; -gboolean pinos_buffer_iter_parse_release_fd_payload (PinosBufferIter *iter, - PinosPacketReleaseFDPayload *payload); -gboolean pinos_buffer_builder_add_release_fd_payload (PinosBufferBuilder *builder, - PinosPacketReleaseFDPayload *payload); +gboolean pinos_buffer_iter_parse_reuse_mem (PinosBufferIter *iter, + PinosPacketReuseMem *payload); +gboolean pinos_buffer_builder_add_reuse_mem (PinosBufferBuilder *builder, + PinosPacketReuseMem *payload); /* format change packets */ @@ -216,9 +273,9 @@ typedef struct { const gchar *format; } PinosPacketFormatChange; -gboolean pinos_buffer_iter_parse_format_change (PinosBufferIter *iter, +gboolean pinos_buffer_iter_parse_format_change (PinosBufferIter *iter, PinosPacketFormatChange *payload); -gboolean pinos_buffer_builder_add_format_change (PinosBufferBuilder *builder, +gboolean pinos_buffer_builder_add_format_change (PinosBufferBuilder *builder, PinosPacketFormatChange *payload); @@ -235,10 +292,10 @@ typedef struct { const gchar *value; } PinosPacketPropertyChange; -gboolean pinos_buffer_iter_parse_property_change (PinosBufferIter *iter, - guint idx, +gboolean pinos_buffer_iter_parse_property_change (PinosBufferIter *iter, + guint idx, PinosPacketPropertyChange *payload); -gboolean pinos_buffer_builder_add_property_change (PinosBufferBuilder *builder, +gboolean pinos_buffer_builder_add_property_change (PinosBufferBuilder *builder, PinosPacketPropertyChange *payload); /* refresh request packets */ diff --git a/pinos/client/fdmanager.c b/pinos/client/fdmanager.c index 3175d780..b7d67afa 100644 --- a/pinos/client/fdmanager.c +++ b/pinos/client/fdmanager.c @@ -202,6 +202,43 @@ wrong_object: return FALSE; } } +/** + * pinos_fd_manager_find: + * @manager: a #PinosFdManager + * @client: a client id + * @id: an id + * + * find the object associated with the id and client from @manager. + * + * Returns: the object or %NULL + */ +gpointer +pinos_fd_manager_find (PinosFdManager *manager, + const gchar *client, guint32 id) +{ + PinosFdManagerPrivate *priv; + ObjectId *oid; + ClientIds *cids; + + g_return_val_if_fail (PINOS_IS_FD_MANAGER (manager), FALSE); + g_return_val_if_fail (client != NULL, FALSE); + + priv = manager->priv; + + g_mutex_lock (&priv->lock); + oid = g_hash_table_lookup (priv->object_ids, GINT_TO_POINTER (id)); + if (oid) { + cids = g_hash_table_lookup (priv->client_ids, client); + if (cids) { + GList *find = g_list_find (cids->ids, oid); + if (find) + return oid->obj; + } + } + g_mutex_unlock (&priv->lock); + + return NULL; +} /** * pinos_fd_manager_remove: diff --git a/pinos/client/fdmanager.h b/pinos/client/fdmanager.h index 25730701..5332bfec 100644 --- a/pinos/client/fdmanager.h +++ b/pinos/client/fdmanager.h @@ -67,6 +67,9 @@ gboolean pinos_fd_manager_add (PinosFdManager *manager, gboolean pinos_fd_manager_remove (PinosFdManager *manager, const gchar *client, guint32 id); +gpointer pinos_fd_manager_find (PinosFdManager *manager, + const gchar *client, + guint32 id); gboolean pinos_fd_manager_remove_all (PinosFdManager *manager, const gchar *client); diff --git a/pinos/client/pinos.h b/pinos/client/pinos.h index f0a3c52c..00ed54b3 100644 --- a/pinos/client/pinos.h +++ b/pinos/client/pinos.h @@ -35,6 +35,7 @@ #define PINOS_DBUS_OBJECT_SERVER PINOS_DBUS_OBJECT_PREFIX "/server" #define PINOS_DBUS_OBJECT_CLIENT PINOS_DBUS_OBJECT_PREFIX "/client" #define PINOS_DBUS_OBJECT_NODE PINOS_DBUS_OBJECT_PREFIX "/node" +#define PINOS_DBUS_OBJECT_LINK PINOS_DBUS_OBJECT_PREFIX "/link" void pinos_init (int *argc, char **argv[]); diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 3031a827..7dc13063 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -65,6 +65,11 @@ struct _PinosStreamPrivate PinosBuffer recv_buffer; guint8 recv_data[MAX_BUFFER_SIZE]; int recv_fds[MAX_FDS]; + + guint8 send_data[MAX_BUFFER_SIZE]; + int send_fds[MAX_FDS]; + + GHashTable *mem_ids; }; #define PINOS_STREAM_GET_PRIVATE(obj) \ @@ -91,6 +96,8 @@ enum static guint signals[LAST_SIGNAL] = { 0 }; +static void unhandle_socket (PinosStream *stream); + static void pinos_stream_get_property (GObject *_object, guint prop_id, @@ -547,6 +554,89 @@ channel_failed: } static gboolean +parse_buffer (PinosStream *stream, + PinosBuffer *pbuf) +{ + PinosBufferIter it; + PinosStreamPrivate *priv = stream->priv; + + pinos_buffer_iter_init (&it, pbuf); + while (pinos_buffer_iter_next (&it)) { + PinosPacketType type = pinos_buffer_iter_get_type (&it); + + switch (type) { + case PINOS_PACKET_TYPE_ADD_MEM: + { + PinosPacketAddMem p; + int fd; + + if (!pinos_buffer_iter_parse_add_mem (&it, &p)) + break; + + fd = pinos_buffer_get_fd (pbuf, p.fd_index); + if (fd == -1) + break; + +// g_hash_table_insert (priv->mem_ids, GINT_TO_POINTER (p.id), NULL); + break; + } + case PINOS_PACKET_TYPE_REMOVE_MEM: + { + PinosPacketRemoveMem p; + + if (!pinos_buffer_iter_parse_remove_mem (&it, &p)) + break; + +// g_hash_table_remove (priv->mem_ids, GINT_TO_POINTER (p.id)); + break; + } + case PINOS_PACKET_TYPE_FORMAT_CHANGE: + { + PinosPacketFormatChange p; + + if (!pinos_buffer_iter_parse_format_change (&it, &p)) + break; + + if (priv->format) + g_bytes_unref (priv->format); + priv->format = g_bytes_new (p.format, strlen (p.format) + 1); + g_object_notify (G_OBJECT (stream), "format"); + break; + } + case PINOS_PACKET_TYPE_STREAMING: + { + stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); + break; + } + case PINOS_PACKET_TYPE_STOPPED: + { + unhandle_socket (stream); + + g_clear_pointer (&priv->format, g_bytes_unref); + g_object_notify (G_OBJECT (stream), "format"); + + stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); + break; + } + case PINOS_PACKET_TYPE_HEADER: + { + break; + } + case PINOS_PACKET_TYPE_PROCESS_MEM: + { + break; + } + default: + g_warning ("unhandled packet %d", type); + break; + } + } + pinos_buffer_iter_end (&it); + + return TRUE; +} + +static gboolean on_socket_condition (GSocket *socket, GIOCondition condition, gpointer user_data) @@ -572,6 +662,8 @@ on_socket_condition (GSocket *socket, return TRUE; } + parse_buffer (stream, buffer); + priv->buffer = buffer; g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL); priv->buffer = NULL; @@ -841,18 +933,25 @@ static gboolean do_start (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; + PinosBufferBuilder builder; + PinosPacketFormatChange fc; + PinosBuffer pbuf; + GError *error = NULL; handle_socket (stream, priv->fd); - g_dbus_proxy_call (priv->channel, - "Start", - g_variant_new ("(s)", - priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY"), - G_DBUS_CALL_FLAGS_NONE, - -1, - NULL, /* GCancellable *cancellable */ - on_stream_started, - stream); + pinos_stream_buffer_builder_init (stream, &builder); + fc.id = 0; + fc.format = priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY"; + pinos_buffer_builder_add_format_change (&builder, &fc); + pinos_buffer_builder_add_empty (&builder, PINOS_PACKET_TYPE_START); + pinos_buffer_builder_end (&builder, &pbuf); + + if (!pinos_io_write_buffer (priv->fd, &pbuf, &error)) { + g_warning ("stream %p: failed to read buffer: %s", stream, error->message); + g_clear_error (&error); + } + g_object_unref (stream); return FALSE; } @@ -936,16 +1035,11 @@ call_failed: static gboolean do_stop (PinosStream *stream) { - PinosStreamPrivate *priv = stream->priv; + PinosBufferBuilder builder; - g_dbus_proxy_call (priv->channel, - "Stop", - g_variant_new ("()"), - G_DBUS_CALL_FLAGS_NONE, - -1, - NULL, /* GCancellable *cancellable */ - on_stream_stopped, - stream); + pinos_stream_buffer_builder_init (stream, &builder); + pinos_buffer_builder_add_empty (&builder, PINOS_PACKET_TYPE_STOP); + g_object_unref (stream); return FALSE; } @@ -1091,9 +1185,16 @@ pinos_stream_peek_buffer (PinosStream *stream) void pinos_stream_buffer_builder_init (PinosStream *stream, PinosBufferBuilder *builder) { + PinosStreamPrivate *priv; + g_return_if_fail (PINOS_IS_STREAM (stream)); + priv = stream->priv; - pinos_buffer_builder_init (builder); + pinos_buffer_builder_init_into (builder, + priv->send_data, + MAX_BUFFER_SIZE, + priv->send_fds, + MAX_FDS); } /** diff --git a/pinos/dbus/org.pinos.xml b/pinos/dbus/org.pinos.xml index b1ab9b9d..0d9633b8 100644 --- a/pinos/dbus/org.pinos.xml +++ b/pinos/dbus/org.pinos.xml @@ -62,6 +62,7 @@ <method name='LinkNodes'> <arg type='o' name='src_node' direction='in' /> <arg type='o' name='dest_node' direction='in' /> + <arg type='s' name='format_filter' direction='in'/> <arg type='a{sv}' name='properties' direction='in'/> <arg type='o' name='link' direction='out' /> </method> @@ -114,23 +115,6 @@ <!-- Format: the current streaming format --> <property name='Format' type='s' access='read' /> - <!-- Start: - @requested_format: requested formats - @format: channel format - @properties: channel properties - - Start the datatransfer on the channel with @requested_format. - --> - <method name='Start'> - <arg type='s' name='requested_format' direction='in'/> - <arg type='s' name='format' direction='out'/> - <arg type='a{sv}' name='properties' direction='out'/> - </method> - <!-- Stop: - - Stop data transport on the channel - --> - <method name='Stop'/> <!-- Remove: Remove the channel diff --git a/pinos/gst/gstpinosdepay.c b/pinos/gst/gstpinosdepay.c index cca5e421..1c70681c 100644 --- a/pinos/gst/gstpinosdepay.c +++ b/pinos/gst/gstpinosdepay.c @@ -115,12 +115,12 @@ gst_pinos_depay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) } static void -release_fds (GstPinosDepay *this, GstBuffer *buffer) +reuse_fds (GstPinosDepay *this, GstBuffer *buffer) { GArray *fdids; guint i; PinosBufferBuilder b; - PinosPacketReleaseFDPayload r; + PinosPacketReuseMem r; PinosBuffer pbuf; gsize size; gpointer data; @@ -136,8 +136,8 @@ release_fds (GstPinosDepay *this, GstBuffer *buffer) for (i = 0; i < fdids->len; i++) { r.id = g_array_index (fdids, guint32, i); - GST_LOG ("release fd index %d", r.id); - pinos_buffer_builder_add_release_fd_payload (&b, &r); + GST_LOG ("reuse mem id %d", r.id); + pinos_buffer_builder_add_reuse_mem (&b, &r); } pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); @@ -204,24 +204,52 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_OFFSET (outbuf) = hdr.seq; break; } - case PINOS_PACKET_TYPE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_ADD_MEM: { GstMemory *fdmem = NULL; - PinosPacketFDPayload p; + PinosPacketAddMem p; int fd; - if (!pinos_buffer_iter_parse_fd_payload (&it, &p)) + if (!pinos_buffer_iter_parse_add_mem (&it, &p)) goto error; + fd = g_unix_fd_list_get (fds, p.fd_index, &err); if (fd == -1) goto error; - if (outbuf == NULL) - outbuf = gst_buffer_new (); - fdmem = gst_fd_allocator_alloc (depay->fd_allocator, fd, p.offset + p.size, GST_FD_MEMORY_FLAG_NONE); gst_memory_resize (fdmem, p.offset, p.size); + + g_hash_table_insert (depay->mem_ids, GINT_TO_POINTER (p.id), fdmem); + break; + } + case PINOS_PACKET_TYPE_REMOVE_MEM: + { + PinosPacketRemoveMem p; + + if (!pinos_buffer_iter_parse_remove_mem (&it, &p)) + goto error; + + g_hash_table_remove (depay->mem_ids, GINT_TO_POINTER (p.id)); + break; + } + case PINOS_PACKET_TYPE_PROCESS_MEM: + { + GstMemory *fdmem = NULL; + PinosPacketProcessMem p; + + if (!pinos_buffer_iter_parse_process_mem (&it, &p)) + goto error; + + fdmem = g_hash_table_lookup (depay->mem_ids, GINT_TO_POINTER (p.id)); + if (fdmem == NULL) + goto error; + + if (outbuf == NULL) + outbuf = gst_buffer_new (); + + fdmem = gst_memory_share (fdmem, p.offset, p.size); gst_buffer_append_memory (outbuf, fdmem); if (fdids == NULL) @@ -261,7 +289,7 @@ gst_pinos_depay_chain (GstPad *pad, GstObject * parent, GstBuffer * buffer) gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (outbuf), fdids_quark, fdids, NULL); gst_mini_object_weak_ref (GST_MINI_OBJECT_CAST (outbuf), - (GstMiniObjectNotify) release_fds, g_object_ref (depay)); + (GstMiniObjectNotify) reuse_fds, g_object_ref (depay)); } return gst_pad_push (depay->srcpad, outbuf); } @@ -326,6 +354,7 @@ gst_pinos_depay_finalize (GObject * object) gst_caps_replace (&depay->caps, NULL); g_object_unref (depay->fd_allocator); + g_hash_table_unref (depay->mem_ids); G_OBJECT_CLASS (gst_pinos_depay_parent_class)->finalize (object); } @@ -376,4 +405,5 @@ gst_pinos_depay_init (GstPinosDepay * depay) gst_element_add_pad (GST_ELEMENT (depay), depay->sinkpad); depay->fd_allocator = gst_fd_allocator_new (); + depay->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_memory_unref); } diff --git a/pinos/gst/gstpinosdepay.h b/pinos/gst/gstpinosdepay.h index 527eb2c1..8eb97338 100644 --- a/pinos/gst/gstpinosdepay.h +++ b/pinos/gst/gstpinosdepay.h @@ -43,6 +43,7 @@ struct _GstPinosDepay GstPad *srcpad, *sinkpad; GstAllocator *fd_allocator; + GHashTable *mem_ids; }; struct _GstPinosDepayClass diff --git a/pinos/gst/gstpinospay.c b/pinos/gst/gstpinospay.c index ff6a87d2..de4a2afb 100644 --- a/pinos/gst/gstpinospay.c +++ b/pinos/gst/gstpinospay.c @@ -218,17 +218,17 @@ client_buffer_received (GstPinosPay *pay, GstBuffer *buffer, pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { - case PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_REUSE_MEM: { - PinosPacketReleaseFDPayload p; + PinosPacketReuseMem p; gint id; - if (!pinos_buffer_iter_parse_release_fd_payload (&it, &p)) + if (!pinos_buffer_iter_parse_reuse_mem (&it, &p)) continue; id = p.id; - GST_LOG ("fd index %d for client %s is released", id, client_path); + GST_LOG ("fd index %d for client %s is reused", id, client_path); pinos_fd_manager_remove (pay->fdmanager, client_path, id); break; } @@ -358,7 +358,7 @@ release_fds (GstPinosPay *pay, GstBuffer *buffer) GArray *fdids; guint i; PinosBufferBuilder b; - PinosPacketReleaseFDPayload r; + PinosPacketReuseMem r; PinosBuffer pbuf; gsize size; gpointer data; @@ -375,7 +375,7 @@ release_fds (GstPinosPay *pay, GstBuffer *buffer) for (i = 0; i < fdids->len; i++) { r.id = g_array_index (fdids, guint32, i); GST_LOG ("release fd index %d", r.id); - pinos_buffer_builder_add_release_fd_payload (&b, &r); + pinos_buffer_builder_add_reuse_mem (&b, &r); } pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); @@ -406,11 +406,11 @@ gst_pinos_pay_chain_pinos (GstPinosPay *pay, GstBuffer * buffer) pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { - case PINOS_PACKET_TYPE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_PROCESS_MEM: { - PinosPacketFDPayload p; + PinosPacketProcessMem p; - if (!pinos_buffer_iter_parse_fd_payload (&it, &p)) + if (!pinos_buffer_iter_parse_process_mem (&it, &p)) continue; if (fdids == NULL) @@ -464,7 +464,9 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) PinosBuffer pbuf; PinosBufferBuilder builder; PinosPacketHeader hdr; - PinosPacketFDPayload p; + PinosPacketAddMem am; + PinosPacketProcessMem p; + PinosPacketRemoveMem rm; gsize size; gpointer data; GSocketControlMessage *msg; @@ -482,12 +484,18 @@ gst_pinos_pay_chain_other (GstPinosPay *pay, GstBuffer * buffer) msg = g_unix_fd_message_new (); fdmem = gst_pinos_pay_get_fd_memory (pay, buffer, &tmpfile); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); - p.id = pinos_fd_manager_get_id (pay->fdmanager); + + am.id = pinos_fd_manager_get_id (pay->fdmanager); + am.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); + am.offset = 0; + am.size = fdmem->size; + p.id = am.id; p.offset = fdmem->offset; p.size = fdmem->size; - pinos_buffer_builder_add_fd_payload (&builder, &p); - + rm.id = am.id; + pinos_buffer_builder_add_add_mem (&builder, &am); + pinos_buffer_builder_add_process_mem (&builder, &p); + pinos_buffer_builder_add_remove_mem (&builder, &rm); pinos_buffer_builder_end (&builder, &pbuf); data = pinos_buffer_steal_data (&pbuf, &size); diff --git a/pinos/gst/gstpinosportsink.c b/pinos/gst/gstpinosportsink.c index 5208bcd5..f87b0380 100644 --- a/pinos/gst/gstpinosportsink.c +++ b/pinos/gst/gstpinosportsink.c @@ -40,9 +40,6 @@ #include "gstpinosportsink.h" #include "gsttmpfileallocator.h" -static GQuark fdids_quark; -static GQuark orig_buffer_quark; - GST_DEBUG_CATEGORY_STATIC (pinos_port_sink_debug); #define GST_CAT_DEFAULT pinos_port_sink_debug @@ -82,9 +79,11 @@ on_received_buffer (PinosPort *port, PinosBuffer *pbuf, GError **error, gpointer PinosBufferIter it; PinosBufferBuilder b; gboolean have_out = FALSE; + guint8 buffer[1024]; + gint fds[8]; if (this->pinos_input) { - pinos_buffer_builder_init (&b); + pinos_buffer_builder_init_into (&b, buffer, 1024, fds, 8); } pinos_buffer_iter_init (&it, pbuf); @@ -227,8 +226,9 @@ gst_pinos_port_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) PinosBufferBuilder builder; PinosBuffer pbuf; PinosPacketFormatChange fc; + guint8 buffer[1024]; - pinos_buffer_builder_init (&builder); + pinos_buffer_builder_init_into (&builder, buffer, 1024, NULL, 0); fc.id = 0; fc.format = cstr = gst_caps_to_string (caps); pinos_buffer_builder_add_format_change (&builder, &fc); @@ -297,31 +297,38 @@ gst_pinos_port_sink_render_other (GstPinosPortSink * this, GstBuffer * buffer) PinosBuffer pbuf; PinosBufferBuilder builder; PinosPacketHeader hdr; - PinosPacketFDPayload p; + PinosPacketAddMem am; + PinosPacketProcessMem p; + PinosPacketRemoveMem rm; gboolean tmpfile = TRUE; - gint fd; + guint8 send_buffer[1024]; + gint send_fds[8]; hdr.flags = 0; hdr.seq = GST_BUFFER_OFFSET (buffer); hdr.pts = GST_BUFFER_PTS (buffer) + GST_ELEMENT_CAST (this)->base_time; hdr.dts_offset = 0; - pinos_buffer_builder_init (&builder); + pinos_buffer_builder_init_into (&builder, send_buffer, 1024, send_fds, 8); pinos_buffer_builder_add_header (&builder, &hdr); fdmem = gst_pinos_port_sink_get_fd_memory (this, buffer, &tmpfile); - fd = gst_fd_memory_get_fd (fdmem); - p.fd_index = pinos_buffer_builder_add_fd (&builder, fd); - p.id = pinos_fd_manager_get_id (this->fdmanager); + am.id = pinos_fd_manager_get_id (this->fdmanager); + am.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); + am.offset = 0; + am.size = fdmem->size; + p.id = am.id; p.offset = fdmem->offset; p.size = fdmem->size; - pinos_buffer_builder_add_fd_payload (&builder, &p); + rm.id = am.id; + pinos_buffer_builder_add_add_mem (&builder, &am); + pinos_buffer_builder_add_process_mem (&builder, &p); + pinos_buffer_builder_add_remove_mem (&builder, &rm); + pinos_buffer_builder_end (&builder, &pbuf); GST_LOG ("send %d %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT, p.id, hdr.pts, GST_BUFFER_PTS (buffer), GST_ELEMENT_CAST (this)->base_time); - pinos_buffer_builder_end (&builder, &pbuf); - if (!pinos_port_send_buffer (this->port, &pbuf, &error)) { GST_WARNING ("send failed: %s", error->message); g_clear_error (&error); @@ -383,9 +390,6 @@ gst_pinos_port_sink_class_init (GstPinosPortSinkClass * klass) gstbasesink_class->propose_allocation = gst_pinos_port_sink_propose_allocation; gstbasesink_class->render = gst_pinos_port_sink_render; - fdids_quark = g_quark_from_static_string ("GstPinosPortSinkFDIds"); - orig_buffer_quark = g_quark_from_static_string ("GstPinosPortSinkOrigBuffer"); - GST_DEBUG_CATEGORY_INIT (pinos_port_sink_debug, "pinosportsink", 0, "Pinos Socket Sink"); } diff --git a/pinos/gst/gstpinosportsrc.c b/pinos/gst/gstpinosportsrc.c index d7518fa5..4aac43b0 100644 --- a/pinos/gst/gstpinosportsrc.c +++ b/pinos/gst/gstpinosportsrc.c @@ -45,7 +45,7 @@ #include <gst/video/video.h> -static GQuark fdpayload_data_quark; +static GQuark process_mem_data_quark; GST_DEBUG_CATEGORY_STATIC (pinos_port_src_debug); #define GST_CAT_DEFAULT pinos_port_src_debug @@ -83,16 +83,16 @@ static gboolean gst_pinos_port_src_query (GstBaseSrc * src, GstQuery * query); typedef struct { GstPinosPortSrc *src; - PinosPacketFDPayload p; -} FDPayloadData; + PinosPacketProcessMem p; +} ProcessMemData; static void -fdpayload_data_destroy (gpointer user_data) +process_mem_data_destroy (gpointer user_data) { - FDPayloadData *data = user_data; + ProcessMemData *data = user_data; GstPinosPortSrc *this = data->src; PinosBufferBuilder b; - PinosPacketReleaseFDPayload r; + PinosPacketReuseMem r; PinosBuffer pbuf; r.id = data->p.id; @@ -100,14 +100,14 @@ fdpayload_data_destroy (gpointer user_data) GST_DEBUG_OBJECT (this, "destroy %d", r.id); pinos_buffer_builder_init (&b); - pinos_buffer_builder_add_release_fd_payload (&b, &r); + pinos_buffer_builder_add_reuse_mem (&b, &r); pinos_buffer_builder_end (&b, &pbuf); pinos_port_send_buffer (this->port, &pbuf, NULL); pinos_buffer_unref (&pbuf); gst_object_unref (this); - g_slice_free (FDPayloadData, data); + g_slice_free (ProcessMemData, data); } static gboolean @@ -145,33 +145,58 @@ on_received_buffer (PinosPort *port, GST_BUFFER_OFFSET (buf) = hdr.seq; break; } - case PINOS_PACKET_TYPE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_ADD_MEM: { GstMemory *fdmem = NULL; - FDPayloadData data; + PinosPacketAddMem p; int fd; - if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p)) + if (!pinos_buffer_iter_parse_add_mem (&it, &p)) break; - GST_DEBUG ("got fd payload id %d", data.p.id); - fd = pinos_buffer_get_fd (pbuf, data.p.fd_index); + fd = pinos_buffer_get_fd (pbuf, p.fd_index); if (fd == -1) break; + fdmem = gst_fd_allocator_alloc (this->fd_allocator, dup (fd), + p.offset + p.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, p.offset, p.size); + + g_hash_table_insert (this->mem_ids, GINT_TO_POINTER (p.id), fdmem); + break; + } + case PINOS_PACKET_TYPE_REMOVE_MEM: + { + PinosPacketRemoveMem p; + + if (!pinos_buffer_iter_parse_remove_mem (&it, &p)) + break; + + g_hash_table_remove (this->mem_ids, GINT_TO_POINTER (p.id)); + break; + } + case PINOS_PACKET_TYPE_PROCESS_MEM: + { + GstMemory *fdmem = NULL; + ProcessMemData data; + + if (!pinos_buffer_iter_parse_process_mem (&it, &data.p)) + break; + + if (!(fdmem = g_hash_table_lookup (this->mem_ids, GINT_TO_POINTER (data.p.id)))) + break; + if (buf == NULL) buf = gst_buffer_new (); - fdmem = gst_fd_allocator_alloc (this->fd_allocator, dup (fd), - data.p.offset + data.p.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, data.p.offset, data.p.size); + fdmem = gst_memory_share (fdmem, data.p.offset, data.p.size); gst_buffer_append_memory (buf, fdmem); data.src = gst_object_ref (this); gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (fdmem), - fdpayload_data_quark, - g_slice_dup (FDPayloadData, &data), - fdpayload_data_destroy); + process_mem_data_quark, + g_slice_dup (ProcessMemData, &data), + process_mem_data_destroy); break; } case PINOS_PACKET_TYPE_FORMAT_CHANGE: @@ -313,6 +338,7 @@ gst_pinos_port_src_finalize (GObject * object) g_object_unref (this->port); if (this->clock) gst_object_unref (this->clock); + g_hash_table_unref (this->mem_ids); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -363,7 +389,7 @@ gst_pinos_port_src_class_init (GstPinosPortSrcClass * klass) GST_DEBUG_CATEGORY_INIT (pinos_port_src_debug, "pinosportsrc", 0, "Pinos Source"); - fdpayload_data_quark = g_quark_from_static_string ("GstPinosPortSrcFDPayloadQuark"); + process_mem_data_quark = g_quark_from_static_string ("GstPinosPortSrcProcessMemQuark"); } static void @@ -378,6 +404,7 @@ gst_pinos_port_src_init (GstPinosPortSrc * src) g_queue_init (&src->queue); src->fd_allocator = gst_fd_allocator_new (); + src->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_memory_unref); } static void diff --git a/pinos/gst/gstpinosportsrc.h b/pinos/gst/gstpinosportsrc.h index 29635ef8..a7f04c1e 100644 --- a/pinos/gst/gstpinosportsrc.h +++ b/pinos/gst/gstpinosportsrc.h @@ -65,6 +65,7 @@ struct _GstPinosPortSrc { GstClock *clock; GstAllocator *fd_allocator; + GHashTable *mem_ids; GQueue queue; GCond cond; diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index af32982e..de7a6e49 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -117,7 +117,7 @@ gst_pinos_sink_finalize (GObject * object) if (pinossink->properties) gst_structure_free (pinossink->properties); - g_hash_table_unref (pinossink->fdids); + g_hash_table_unref (pinossink->mem_ids); g_object_unref (pinossink->allocator); g_free (pinossink->path); g_free (pinossink->client_name); @@ -210,11 +210,12 @@ static void gst_pinos_sink_init (GstPinosSink * sink) { sink->allocator = gst_tmpfile_allocator_new (); + sink->fdmanager = pinos_fd_manager_get (PINOS_FD_MANAGER_DEFAULT); sink->client_name = pinos_client_name(); sink->mode = DEFAULT_PROP_MODE; - sink->fdids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, - (GDestroyNotify) gst_buffer_unref); + sink->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + (GDestroyNotify) gst_memory_unref); } static GstCaps * @@ -345,15 +346,15 @@ on_new_buffer (GObject *gobject, pinos_buffer_iter_init (&it, pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { - case PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_REUSE_MEM: { - PinosPacketReleaseFDPayload p; + PinosPacketReuseMem p; - if (!pinos_buffer_iter_parse_release_fd_payload (&it, &p)) + if (!pinos_buffer_iter_parse_reuse_mem (&it, &p)) continue; - GST_LOG ("fd index %d is released", p.id); - g_hash_table_remove (pinossink->fdids, GINT_TO_POINTER (p.id)); + GST_LOG ("mem index %d is reused", p.id); + g_hash_table_remove (pinossink->mem_ids, GINT_TO_POINTER (p.id)); break; } case PINOS_PACKET_TYPE_REFRESH_REQUEST: @@ -508,10 +509,11 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GstMemory *mem = NULL; GstClockTime pts, dts, base; PinosPacketHeader hdr; - PinosPacketFDPayload p; + PinosPacketAddMem am; + PinosPacketProcessMem p; + PinosPacketRemoveMem rm; gsize size; gboolean tmpfile, res; - gint fd; pinossink = GST_PINOS_SINK (bsink); @@ -559,27 +561,32 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) pinos_stream_buffer_builder_init (pinossink->stream, &builder); pinos_buffer_builder_add_header (&builder, &hdr); - fd = gst_fd_memory_get_fd (mem); - p.fd_index = pinos_buffer_builder_add_fd (&builder, fd); - p.id = pinossink->id_counter++; - p.offset = 0; - p.size = size; - pinos_buffer_builder_add_fd_payload (&builder, &p); + am.id = pinos_fd_manager_get_id (pinossink->fdmanager); + am.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (mem)); + am.offset = 0; + am.size = mem->size; + p.id = am.id; + p.offset = mem->offset; + p.size = mem->size; + rm.id = am.id; + pinos_buffer_builder_add_add_mem (&builder, &am); + pinos_buffer_builder_add_process_mem (&builder, &p); + pinos_buffer_builder_add_remove_mem (&builder, &rm); pinos_buffer_builder_end (&builder, &pbuf); - GST_LOG ("sending fd index %d %d %d", p.id, p.fd_index, fd); + GST_LOG ("sending fd mem %d %d", am.id, am.fd_index); res = pinos_stream_send_buffer (pinossink->stream, &pbuf); pinos_buffer_steal_fds (&pbuf, NULL); pinos_buffer_unref (&pbuf); pinos_main_loop_unlock (pinossink->loop); - gst_memory_unref (mem); - if (res && !tmpfile) { - /* keep the buffer around until we get the release fd message */ - g_hash_table_insert (pinossink->fdids, GINT_TO_POINTER (p.id), gst_buffer_ref (buffer)); - } + /* keep the memory around until we get the reuse mem message */ + g_hash_table_insert (pinossink->mem_ids, GINT_TO_POINTER (p.id), mem); + } else + gst_memory_unref (mem); + return GST_FLOW_OK; not_negotiated: @@ -792,10 +799,10 @@ gst_pinos_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: - g_hash_table_remove_all (this->fdids); + g_hash_table_remove_all (this->mem_ids); break; case GST_STATE_CHANGE_READY_TO_NULL: - g_hash_table_remove_all (this->fdids); + g_hash_table_remove_all (this->mem_ids); gst_pinos_sink_close (this); break; default: diff --git a/pinos/gst/gstpinossink.h b/pinos/gst/gstpinossink.h index e8282c76..d0708b6b 100644 --- a/pinos/gst/gstpinossink.h +++ b/pinos/gst/gstpinossink.h @@ -84,8 +84,8 @@ struct _GstPinosSink { GstStructure *properties; GstPinosSinkMode mode; - guint32 id_counter; - GHashTable *fdids; + PinosFdManager *fdmanager; + GHashTable *mem_ids; }; struct _GstPinosSinkClass { diff --git a/pinos/gst/gstpinossocketsink.c b/pinos/gst/gstpinossocketsink.c index 95b91064..7c3a8560 100644 --- a/pinos/gst/gstpinossocketsink.c +++ b/pinos/gst/gstpinossocketsink.c @@ -174,12 +174,12 @@ gst_pinos_socket_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) } static void -release_fds (GstPinosSocketSink *this, GstBuffer *buffer) +reuse_fds (GstPinosSocketSink *this, GstBuffer *buffer) { GArray *fdids; guint i; PinosBufferBuilder b; - PinosPacketReleaseFDPayload r; + PinosPacketReuseMem r; PinosBuffer pbuf; gsize size; gpointer data; @@ -195,8 +195,8 @@ release_fds (GstPinosSocketSink *this, GstBuffer *buffer) for (i = 0; i < fdids->len; i++) { r.id = g_array_index (fdids, guint32, i); - GST_LOG ("release fd index %d", r.id); - pinos_buffer_builder_add_release_fd_payload (&b, &r); + GST_LOG ("reuse fd index %d", r.id); + pinos_buffer_builder_add_reuse_mem (&b, &r); } pinos_buffer_builder_end (&b, &pbuf); g_array_unref (fdids); @@ -227,17 +227,17 @@ gst_pinos_socket_sink_render_pinos (GstPinosSocketSink * this, GstBuffer * buffe pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { - case PINOS_PACKET_TYPE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_PROCESS_MEM: { - PinosPacketFDPayload p; + PinosPacketProcessMem p; - if (!pinos_buffer_iter_parse_fd_payload (&it, &p)) + if (!pinos_buffer_iter_parse_process_mem (&it, &p)) continue; if (fdids == NULL) fdids = g_array_new (FALSE, FALSE, sizeof (guint32)); - GST_LOG ("track fd index %d", p.id); + GST_LOG ("track fd id %d", p.id); g_array_append_val (fdids, p.id); break; } @@ -270,7 +270,7 @@ gst_pinos_socket_sink_render_pinos (GstPinosSocketSink * this, GstBuffer * buffe gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buffer), fdids_quark, fdids, NULL); gst_mini_object_weak_ref (GST_MINI_OBJECT_CAST (buffer), - (GstMiniObjectNotify) release_fds, g_object_ref (this)); + (GstMiniObjectNotify) reuse_fds, g_object_ref (this)); } gst_burst_cache_queue_buffer (this->cache, gst_buffer_ref (buffer)); @@ -310,7 +310,9 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe PinosBuffer pbuf; PinosBufferBuilder builder; PinosPacketHeader hdr; - PinosPacketFDPayload p; + PinosPacketAddMem am; + PinosPacketProcessMem p; + PinosPacketRemoveMem rm; gsize size; gpointer data; GSocketControlMessage *msg; @@ -326,17 +328,23 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe pinos_buffer_builder_add_header (&builder, &hdr); fdmem = gst_pinos_socket_sink_get_fd_memory (this, buffer, &tmpfile); - p.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); - p.id = pinos_fd_manager_get_id (this->fdmanager); + + am.id = pinos_fd_manager_get_id (this->fdmanager); + am.fd_index = pinos_buffer_builder_add_fd (&builder, gst_fd_memory_get_fd (fdmem)); + am.offset = 0; + am.size = fdmem->size; + p.id = am.id; p.offset = fdmem->offset; p.size = fdmem->size; - pinos_buffer_builder_add_fd_payload (&builder, &p); + rm.id = am.id; + pinos_buffer_builder_add_add_mem (&builder, &am); + pinos_buffer_builder_add_process_mem (&builder, &p); + pinos_buffer_builder_add_remove_mem (&builder, &rm); + pinos_buffer_builder_end (&builder, &pbuf); GST_LOG ("send %d %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT, p.id, hdr.pts, GST_BUFFER_PTS (buffer), GST_ELEMENT_CAST (this)->base_time); - pinos_buffer_builder_end (&builder, &pbuf); - data = pinos_buffer_steal_data (&pbuf, &size); fds = pinos_buffer_steal_fds (&pbuf, &n_fds); pinos_buffer_unref (&pbuf); @@ -548,17 +556,17 @@ myreader_receive_buffer (GstPinosSocketSink *this, MyReader *myreader) pinos_buffer_iter_init (&it, &pbuf); while (pinos_buffer_iter_next (&it)) { switch (pinos_buffer_iter_get_type (&it)) { - case PINOS_PACKET_TYPE_RELEASE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_REUSE_MEM: { - PinosPacketReleaseFDPayload p; + PinosPacketReuseMem p; gint id; - if (!pinos_buffer_iter_parse_release_fd_payload (&it, &p)) + if (!pinos_buffer_iter_parse_reuse_mem (&it, &p)) continue; id = p.id; - GST_LOG ("fd index %d for client %s is released", id, client_path); + GST_LOG ("fd id %d for client %s is reused", id, client_path); pinos_fd_manager_remove (this->fdmanager, client_path, id); break; } diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 93ee4de1..8cbdb564 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -45,7 +45,7 @@ #include <gst/video/video.h> -static GQuark fdpayload_data_quark; +static GQuark process_mem_data_quark; GST_DEBUG_CATEGORY_STATIC (pinos_src_debug); #define GST_CAT_DEFAULT pinos_src_debug @@ -183,6 +183,7 @@ gst_pinos_src_finalize (GObject * object) gst_object_unref (pinossrc->clock); g_free (pinossrc->path); g_free (pinossrc->client_name); + g_hash_table_unref (pinossrc->mem_ids); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -254,7 +255,7 @@ gst_pinos_src_class_init (GstPinosSrcClass * klass) GST_DEBUG_CATEGORY_INIT (pinos_src_debug, "pinossrc", 0, "Pinos Source"); - fdpayload_data_quark = g_quark_from_static_string ("GstPinosSrcFDPayloadQuark"); + process_mem_data_quark = g_quark_from_static_string ("GstPinosSrcProcessMemQuark"); } static void @@ -269,6 +270,7 @@ gst_pinos_src_init (GstPinosSrc * src) src->fd_allocator = gst_fd_allocator_new (); src->client_name = pinos_client_name (); + src->mem_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_memory_unref); } static GstCaps * @@ -320,16 +322,16 @@ gst_pinos_src_src_fixate (GstBaseSrc * bsrc, GstCaps * caps) typedef struct { GstPinosSrc *src; - PinosPacketFDPayload p; -} FDPayloadData; + PinosPacketProcessMem p; +} ProcessMemData; static void -fdpayload_data_destroy (gpointer user_data) +process_mem_data_destroy (gpointer user_data) { - FDPayloadData *data = user_data; + ProcessMemData *data = user_data; GstPinosSrc *pinossrc = data->src; PinosBufferBuilder b; - PinosPacketReleaseFDPayload r; + PinosPacketReuseMem r; PinosBuffer pbuf; r.id = data->p.id; @@ -339,7 +341,7 @@ fdpayload_data_destroy (gpointer user_data) GST_OBJECT_LOCK (pinossrc); if (pinossrc->stream_state == PINOS_STREAM_STATE_STREAMING) { pinos_stream_buffer_builder_init (pinossrc->stream, &b); - pinos_buffer_builder_add_release_fd_payload (&b, &r); + pinos_buffer_builder_add_reuse_mem (&b, &r); pinos_buffer_builder_end (&b, &pbuf); GST_DEBUG_OBJECT (pinossrc, "send release-fd for %d", r.id); @@ -349,7 +351,7 @@ fdpayload_data_destroy (gpointer user_data) GST_OBJECT_UNLOCK (pinossrc); gst_object_unref (pinossrc); - g_slice_free (FDPayloadData, data); + g_slice_free (ProcessMemData, data); } static void @@ -390,33 +392,59 @@ on_new_buffer (GObject *gobject, GST_BUFFER_OFFSET (buf) = hdr.seq; break; } - case PINOS_PACKET_TYPE_FD_PAYLOAD: + case PINOS_PACKET_TYPE_ADD_MEM: { GstMemory *fdmem = NULL; - FDPayloadData data; + PinosPacketAddMem p; int fd; - if (!pinos_buffer_iter_parse_fd_payload (&it, &data.p)) + if (!pinos_buffer_iter_parse_add_mem (&it, &p)) goto parse_failed; - GST_DEBUG ("got fd payload id %d", data.p.id); - fd = pinos_buffer_get_fd (pbuf, data.p.fd_index); + fd = pinos_buffer_get_fd (pbuf, p.fd_index); if (fd == -1) - goto no_fds; + goto parse_failed; + + fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (fd), + p.offset + p.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, p.offset, p.size); + + g_hash_table_insert (pinossrc->mem_ids, GINT_TO_POINTER (p.id), fdmem); + break; + } + case PINOS_PACKET_TYPE_REMOVE_MEM: + { + PinosPacketRemoveMem p; + + if (!pinos_buffer_iter_parse_remove_mem (&it, &p)) + goto parse_failed; + + g_hash_table_remove (pinossrc->mem_ids, GINT_TO_POINTER (p.id)); + break; + } + case PINOS_PACKET_TYPE_PROCESS_MEM: + { + GstMemory *fdmem = NULL; + ProcessMemData data; + + if (!pinos_buffer_iter_parse_process_mem (&it, &data.p)) + goto parse_failed; + + GST_DEBUG ("got mem id %d", data.p.id); + if (!(fdmem = g_hash_table_lookup (pinossrc->mem_ids, GINT_TO_POINTER (data.p.id)))) + goto parse_failed; if (buf == NULL) buf = gst_buffer_new (); - fdmem = gst_fd_allocator_alloc (pinossrc->fd_allocator, dup (fd), - data.p.offset + data.p.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, data.p.offset, data.p.size); + fdmem = gst_memory_share (fdmem, data.p.offset, data.p.size); gst_buffer_append_memory (buf, fdmem); data.src = gst_object_ref (pinossrc); gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (fdmem), - fdpayload_data_quark, - g_slice_dup (FDPayloadData, &data), - fdpayload_data_destroy); + process_mem_data_quark, + g_slice_dup (ProcessMemData, &data), + process_mem_data_destroy); break; } case PINOS_PACKET_TYPE_FORMAT_CHANGE: @@ -456,16 +484,6 @@ parse_failed: pinos_main_loop_signal (pinossrc->loop, FALSE); return; } -no_fds: - { - pinos_buffer_iter_end (&it); - pinos_buffer_unref (pbuf); - gst_buffer_unref (buf); - GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, ("fd not found in buffer"), (NULL)); - pinos_main_loop_signal (pinossrc->loop, FALSE); - return; - } - } static void diff --git a/pinos/gst/gstpinossrc.h b/pinos/gst/gstpinossrc.h index 4678b346..a7cf62a3 100644 --- a/pinos/gst/gstpinossrc.h +++ b/pinos/gst/gstpinossrc.h @@ -71,6 +71,7 @@ struct _GstPinosSrc { GstAllocator *fd_allocator; GstStructure *properties; + GHashTable *mem_ids; GQueue queue; GstClock *clock; }; diff --git a/pinos/server/channel.c b/pinos/server/channel.c index 41d33cdd..e364e7df 100644 --- a/pinos/server/channel.c +++ b/pinos/server/channel.c @@ -154,9 +154,9 @@ pinos_channel_get_property (GObject *_object, static void pinos_channel_set_property (GObject *_object, - guint prop_id, - const GValue *value, - GParamSpec *pspec) + guint prop_id, + const GValue *value, + GParamSpec *pspec) { PinosChannel *channel = PINOS_CHANNEL (_object); PinosChannelPrivate *priv = channel->priv; @@ -234,89 +234,6 @@ stop_transfer (PinosChannel *channel) pinos_port_deactivate (priv->port); clear_formats (channel); priv->state = PINOS_CHANNEL_STATE_STOPPED; - g_object_set (priv->iface, - "state", priv->state, - NULL); -} - - - -static gboolean -handle_start (PinosChannel1 *interface, - GDBusMethodInvocation *invocation, - const gchar *arg_requested_format, - gpointer user_data) -{ - PinosChannel *channel = user_data; - PinosChannelPrivate *priv = channel->priv; - GBytes *req_format, *format; - const gchar *format_str; - GError *error = NULL; - - priv->state = PINOS_CHANNEL_STATE_STARTING; - - req_format = g_bytes_new (arg_requested_format, strlen (arg_requested_format) + 1); - - format = pinos_format_filter (priv->possible_formats, req_format, &error); - if (format == NULL) - goto no_format; - - format_str = g_bytes_get_data (format, NULL); - - g_debug ("channel %p: handle start, format %s", channel, format_str); - - g_object_set (priv->port, "possible-formats", format, NULL); - pinos_port_activate (priv->port); - g_object_get (priv->port, "format", &format, NULL); - if (format == NULL) - goto no_format_activate; - - format_str = g_bytes_get_data (format, NULL); - - priv->state = PINOS_CHANNEL_STATE_STREAMING; - g_debug ("channel %p: we are now streaming in format \"%s\"", channel, format_str); - - g_dbus_method_invocation_return_value (invocation, - g_variant_new ("(s@a{sv})", - format_str, - pinos_properties_to_variant (priv->properties))); - g_object_set (priv->iface, - "format", format_str, - "state", priv->state, - NULL); - - return TRUE; - -no_format: - { - g_debug ("no format found"); - g_dbus_method_invocation_return_gerror (invocation, error); - g_clear_error (&error); - return TRUE; - } -no_format_activate: - { - g_debug ("no format found when activating"); - g_dbus_method_invocation_return_dbus_error (invocation, - "org.pinos.Error", "can't negotiate formats"); - return TRUE; - } -} - -static gboolean -handle_stop (PinosChannel1 *interface, - GDBusMethodInvocation *invocation, - gpointer user_data) -{ - PinosChannel *channel = user_data; - - g_debug ("channel %p: handle stop", channel); - - stop_transfer (channel); - - g_dbus_method_invocation_return_value (invocation, NULL); - - return TRUE; } static gboolean @@ -355,6 +272,93 @@ on_send_buffer (PinosPort *port, } static gboolean +parse_buffer (PinosChannel *channel, + PinosBuffer *pbuf) +{ + PinosBufferIter it; + PinosChannelPrivate *priv = channel->priv; + + pinos_buffer_iter_init (&it, pbuf); + while (pinos_buffer_iter_next (&it)) { + PinosPacketType type = pinos_buffer_iter_get_type (&it); + + switch (type) { + case PINOS_PACKET_TYPE_FORMAT_CHANGE: + { + PinosPacketFormatChange p; + GBytes *format, *req_format; + GError *error = NULL; + const gchar *format_str; + + if (!pinos_buffer_iter_parse_format_change (&it, &p)) + break; + + req_format = g_bytes_new_static (p.format, strlen (p.format) + 1); + format = pinos_format_filter (priv->possible_formats, req_format, &error); + g_bytes_unref (req_format); + + if (format == NULL) + break; + + format_str = g_bytes_get_data (format, NULL); + + g_debug ("channel %p: format change %s", channel, format_str); + g_object_set (priv->port, "possible-formats", format, NULL); + g_object_set (priv->iface, "format", format_str, NULL); + break; + } + case PINOS_PACKET_TYPE_START: + { + GBytes *format; + PinosBufferBuilder builder; + PinosPacketFormatChange fc; + PinosBuffer obuf; + guint8 buffer[1024]; + GError *error = NULL; + + pinos_port_activate (priv->port); + g_object_get (priv->port, "format", &format, NULL); + if (format == NULL) + break; + + fc.id = 0; + fc.format = g_bytes_get_data (format, NULL); + + g_debug ("channel %p: we are now streaming in format \"%s\"", channel, fc.format); + + priv->state = PINOS_CHANNEL_STATE_STREAMING; + pinos_buffer_builder_init_into (&builder, buffer, 1024, NULL, 0); + pinos_buffer_builder_add_format_change (&builder, &fc); + pinos_buffer_builder_add_empty (&builder, PINOS_PACKET_TYPE_STREAMING); + pinos_buffer_builder_end (&builder, &obuf); + g_object_set (priv->iface, "state", priv->state, NULL); + + if (!pinos_io_write_buffer (priv->fd, &obuf, &error)) { + g_warning ("channel %p: error writing buffer: %s", channel, error->message); + g_clear_error (&error); + } + + break; + } + case PINOS_PACKET_TYPE_STOP: + { + break; + } + case PINOS_PACKET_TYPE_REUSE_MEM: + { + break; + } + default: + g_warning ("unhandled packet %d", type); + break; + } + } + pinos_buffer_iter_end (&it); + + return TRUE; +} + +static gboolean on_socket_condition (GSocket *socket, GIOCondition condition, gpointer user_data) @@ -380,6 +384,8 @@ on_socket_condition (GSocket *socket, return TRUE; } + parse_buffer (channel, buffer); + if (!pinos_port_receive_buffer (priv->port, buffer, &error)) { g_warning ("channel %p: port %p failed to receive buffer: %s", channel, priv->port, error->message); g_clear_error (&error); @@ -485,7 +491,10 @@ channel_register_object (PinosChannel *channel) PinosObjectSkeleton *skel; gchar *name; - priv->send_id = pinos_port_add_send_buffer_cb (priv->port, on_send_buffer, channel, NULL); + priv->send_id = pinos_port_add_send_buffer_cb (priv->port, + on_send_buffer, + channel, + NULL); name = g_strdup_printf ("%s/channel", priv->client_path); skel = pinos_object_skeleton_new (name); @@ -557,6 +566,7 @@ pinos_channel_constructed (GObject * object) G_OBJECT_CLASS (pinos_channel_parent_class)->constructed (object); } + static void pinos_channel_class_init (PinosChannelClass * klass) { @@ -677,8 +687,6 @@ pinos_channel_init (PinosChannel * channel) PinosChannelPrivate *priv = channel->priv = PINOS_CHANNEL_GET_PRIVATE (channel); priv->iface = pinos_channel1_skeleton_new (); - g_signal_connect (priv->iface, "handle-start", (GCallback) handle_start, channel); - g_signal_connect (priv->iface, "handle-stop", (GCallback) handle_stop, channel); g_signal_connect (priv->iface, "handle-remove", (GCallback) handle_remove, channel); priv->state = PINOS_CHANNEL_STATE_STOPPED; diff --git a/pinos/server/link.c b/pinos/server/link.c new file mode 100644 index 00000000..9f1f5e13 --- /dev/null +++ b/pinos/server/link.c @@ -0,0 +1,401 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include <string.h> + +#include <gio/gio.h> + +#include "pinos/client/pinos.h" +#include "pinos/client/enumtypes.h" + +#include "pinos/server/link.h" +#include "pinos/server/port.h" + +#include "pinos/dbus/org-pinos.h" + +#define PINOS_LINK_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), PINOS_TYPE_LINK, PinosLinkPrivate)) + +struct _PinosLinkPrivate +{ + PinosDaemon *daemon; + PinosLink1 *iface; + + gchar *object_path; + + PinosPort *src; + PinosPort *dest; + GBytes *possible_formats; + GBytes *format; + PinosProperties *properties; +}; + +G_DEFINE_TYPE (PinosLink, pinos_link, G_TYPE_OBJECT); + +enum +{ + PROP_0, + PROP_DAEMON, + PROP_OBJECT_PATH, + PROP_POSSIBLE_FORMATS, + PROP_FORMAT, + PROP_PROPERTIES, +}; + +enum +{ + SIGNAL_REMOVE, + SIGNAL_ACTIVATE, + SIGNAL_DEACTIVATE, + LAST_SIGNAL +}; + +static guint signals[LAST_SIGNAL] = { 0 }; + +static void +pinos_link_get_property (GObject *_object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + PinosLink *link = PINOS_LINK (_object); + PinosLinkPrivate *priv = link->priv; + + switch (prop_id) { + case PROP_DAEMON: + g_value_set_object (value, priv->daemon); + break; + + case PROP_OBJECT_PATH: + g_value_set_string (value, priv->object_path); + break; + + case PROP_POSSIBLE_FORMATS: + g_value_set_boxed (value, priv->possible_formats); + break; + + case PROP_FORMAT: + g_value_set_boxed (value, priv->format); + break; + + case PROP_PROPERTIES: + g_value_set_boxed (value, priv->properties); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (link, prop_id, pspec); + break; + } +} + +static void +pinos_link_set_property (GObject *_object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + PinosLink *link = PINOS_LINK (_object); + PinosLinkPrivate *priv = link->priv; + + switch (prop_id) { + case PROP_DAEMON: + priv->daemon = g_value_dup_object (value); + break; + + case PROP_OBJECT_PATH: + priv->object_path = g_value_dup_string (value); + break; + + case PROP_POSSIBLE_FORMATS: + if (priv->possible_formats) + g_bytes_unref (priv->possible_formats); + priv->possible_formats = g_value_dup_boxed (value); + break; + + case PROP_FORMAT: + if (priv->format) + g_bytes_unref (priv->format); + priv->format = g_value_dup_boxed (value); + break; + + case PROP_PROPERTIES: + if (priv->properties) + pinos_properties_free (priv->properties); + priv->properties = g_value_dup_boxed (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (link, prop_id, pspec); + break; + } +} + +static void +link_register_object (PinosLink *link) +{ + PinosLinkPrivate *priv = link->priv; + PinosObjectSkeleton *skel; + gchar *name; + + name = g_strdup_printf (PINOS_DBUS_OBJECT_LINK); + skel = pinos_object_skeleton_new (name); + g_free (name); + + pinos_object_skeleton_set_link1 (skel, priv->iface); + + g_free (priv->object_path); + priv->object_path = pinos_daemon_export_uniquely (priv->daemon, G_DBUS_OBJECT_SKELETON (skel)); + g_object_unref (skel); + + g_debug ("link %p: register object %s", link, priv->object_path); +} + +static void +link_unregister_object (PinosLink *link) +{ + PinosLinkPrivate *priv = link->priv; + + g_debug ("link %p: unregister object", link); + pinos_daemon_unexport (priv->daemon, priv->object_path); +} + +static void +pinos_link_constructed (GObject * object) +{ + PinosLink *link = PINOS_LINK (object); + + g_debug ("link %p: constructed", link); + link_register_object (link); + + G_OBJECT_CLASS (pinos_link_parent_class)->constructed (object); +} + +static void +pinos_link_dispose (GObject * object) +{ + PinosLink *link = PINOS_LINK (object); + + g_debug ("link %p: dispose", link); + link_unregister_object (link); + + G_OBJECT_CLASS (pinos_link_parent_class)->dispose (object); +} + +static void +pinos_link_finalize (GObject * object) +{ + PinosLink *link = PINOS_LINK (object); + PinosLinkPrivate *priv = link->priv; + + g_debug ("link %p: finalize", link); + g_clear_pointer (&priv->possible_formats, g_bytes_unref); + g_clear_pointer (&priv->format, g_bytes_unref); + g_clear_pointer (&priv->properties, pinos_properties_free); + g_clear_object (&priv->daemon); + g_clear_object (&priv->iface); + g_free (priv->object_path); + + G_OBJECT_CLASS (pinos_link_parent_class)->finalize (object); +} + +static void +pinos_link_class_init (PinosLinkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + //PinosLinkClass *link_class = PINOS_LINK_CLASS (klass); + + g_type_class_add_private (klass, sizeof (PinosLinkPrivate)); + + gobject_class->constructed = pinos_link_constructed; + gobject_class->dispose = pinos_link_dispose; + gobject_class->finalize = pinos_link_finalize; + gobject_class->set_property = pinos_link_set_property; + gobject_class->get_property = pinos_link_get_property; + + g_object_class_install_property (gobject_class, + PROP_DAEMON, + g_param_spec_object ("daemon", + "Daemon", + "The Daemon", + PINOS_TYPE_DAEMON, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_POSSIBLE_FORMATS, + g_param_spec_boxed ("possible-formats", + "Possible Formats", + "The possbile formats of the link", + G_TYPE_BYTES, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_FORMAT, + g_param_spec_boxed ("format", + "Format", + "The format of the link", + G_TYPE_BYTES, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_PROPERTIES, + g_param_spec_boxed ("properties", + "Properties", + "The properties of the link", + PINOS_TYPE_PROPERTIES, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + + signals[SIGNAL_REMOVE] = g_signal_new ("remove", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 0, + G_TYPE_NONE); + signals[SIGNAL_ACTIVATE] = g_signal_new ("activate", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 0, + G_TYPE_NONE); + signals[SIGNAL_DEACTIVATE] = g_signal_new ("deactivate", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 0, + G_TYPE_NONE); +} + +static void +pinos_link_init (PinosLink * link) +{ + PinosLinkPrivate *priv = link->priv = PINOS_LINK_GET_PRIVATE (link); + + priv->iface = pinos_link1_skeleton_new (); + g_debug ("link %p: new", link); +} + +/** + * pinos_link_remove: + * @link: a #PinosLink + * + * Trigger removal of @link + */ +void +pinos_link_remove (PinosLink *link) +{ + g_return_if_fail (PINOS_IS_LINK (link)); + + g_debug ("link %p: remove", link); + g_signal_emit (link, signals[SIGNAL_REMOVE], 0, NULL); +} + +/** + * pinos_link_get_object_path: + * @link: a #PinosLink + * + * Get the object patch of @link + * + * Returns: the object path of @source. + */ +const gchar * +pinos_link_get_object_path (PinosLink *link) +{ + PinosLinkPrivate *priv; + + g_return_val_if_fail (PINOS_IS_LINK (link), NULL); + priv = link->priv; + + return priv->object_path; +} + +/** + * pinos_link_get_possible_formats: + * @link: a #PinosLink + * + * Get the possible formats of @link + * + * Returns: the possible formats or %NULL + */ +GBytes * +pinos_link_get_possible_formats (PinosLink *link) +{ + PinosLinkPrivate *priv; + + g_return_val_if_fail (PINOS_IS_LINK (link), NULL); + priv = link->priv; + + return priv->possible_formats; +} + +/** + * pinos_link_get_format: + * @link: a #PinosLink + * + * Get the format of @link + * + * Returns: the format or %NULL + */ +GBytes * +pinos_link_get_format (PinosLink *link) +{ + PinosLinkPrivate *priv; + + g_return_val_if_fail (PINOS_IS_LINK (link), NULL); + priv = link->priv; + + return priv->format; +} + +/** + * pinos_link_get_properties: + * @link: a #PinosLink + * + * Get the properties of @link + * + * Returns: the properties or %NULL + */ +PinosProperties * +pinos_link_get_properties (PinosLink *link) +{ + PinosLinkPrivate *priv; + + g_return_val_if_fail (PINOS_IS_LINK (link), NULL); + priv = link->priv; + + return priv->properties; +} diff --git a/pinos/server/link.h b/pinos/server/link.h new file mode 100644 index 00000000..2ac7010f --- /dev/null +++ b/pinos/server/link.h @@ -0,0 +1,75 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __PINOS_LINK_H__ +#define __PINOS_LINK_H__ + +#include <glib-object.h> + +G_BEGIN_DECLS + +typedef struct _PinosLink PinosLink; +typedef struct _PinosLinkClass PinosLinkClass; +typedef struct _PinosLinkPrivate PinosLinkPrivate; + +#include <pinos/server/daemon.h> + +#define PINOS_TYPE_LINK (pinos_link_get_type ()) +#define PINOS_IS_LINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), PINOS_TYPE_LINK)) +#define PINOS_IS_LINK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), PINOS_TYPE_LINK)) +#define PINOS_LINK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), PINOS_TYPE_LINK, PinosLinkClass)) +#define PINOS_LINK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), PINOS_TYPE_LINK, PinosLink)) +#define PINOS_LINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), PINOS_TYPE_LINK, PinosLinkClass)) +#define PINOS_LINK_CAST(obj) ((PinosLink*)(obj)) +#define PINOS_LINK_CLASS_CAST(klass)((PinosLinkClass*)(klass)) + +/** + * PinosLink: + * + * Pinos link object class. + */ +struct _PinosLink { + GObject object; + + PinosLinkPrivate *priv; +}; + +/** + * PinosLinkClass: + * + * Pinos link object class. + */ +struct _PinosLinkClass { + GObjectClass parent_class; +}; + +/* normal GObject stuff */ +GType pinos_link_get_type (void); + +void pinos_link_remove (PinosLink *link); + +GBytes * pinos_link_get_possible_formats (PinosLink *link); +GBytes * pinos_link_get_format (PinosLink *link); +PinosProperties * pinos_link_get_properties (PinosLink *link); + +const gchar * pinos_link_get_object_path (PinosLink *link); + +G_END_DECLS + +#endif /* __PINOS_LINK_H__ */ diff --git a/pinos/server/port.c b/pinos/server/port.c index 285dbbad..c84bf7dd 100644 --- a/pinos/server/port.c +++ b/pinos/server/port.c @@ -619,7 +619,7 @@ pinos_port_create_channel (PinosPort *port, return NULL; g_debug ("port %p: make channel with formats %s", port, - g_bytes_get_data (possible_formats, NULL)); + (gchar *) g_bytes_get_data (possible_formats, NULL)); channel = g_object_new (PINOS_TYPE_CHANNEL, "daemon", priv->daemon, "port", port, |