diff options
author | Robert Swain <robert.swain@collabora.co.uk> | 2010-06-18 17:43:40 +0200 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2010-08-13 16:38:57 +0200 |
commit | 9df54eb4ffcc3c06f1653b816e8953046d1fc0d6 (patch) | |
tree | 7f20b6ddea2c62a5f97d40137057cbdc6e8108b2 | |
parent | e29cca10a43d6883253ebf4f586d46a9a925afaa (diff) |
queue2: extend ring buffer to support ram mode
-rw-r--r-- | plugins/elements/gstqueue2.c | 101 | ||||
-rw-r--r-- | plugins/elements/gstqueue2.h | 1 |
2 files changed, 74 insertions, 28 deletions
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index 0b096c256e..0a86522e96 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -64,6 +64,8 @@ #include "gst/gst-i18n-lib.h" +#include <string.h> + #ifdef G_OS_WIN32 #include <io.h> /* lseek, open, close, read */ #undef lseek @@ -450,6 +452,7 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class) queue->temp_remove = DEFAULT_TEMP_REMOVE; queue->use_ring_buffer = FALSE; + queue->ring_buffer = NULL; queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE; GST_DEBUG_OBJECT (queue, @@ -1059,18 +1062,29 @@ static gint64 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length, guint8 * dst) { + guint8 *ring_buffer; size_t res; - if (FSEEK_FILE (queue->temp_file, offset)) + ring_buffer = queue->ring_buffer; + + if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset)) goto seek_failed; /* this should not block */ GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT, length, offset); - res = fread (dst, 1, length, queue->temp_file); + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + res = fread (dst, 1, length, queue->temp_file); + } else { + memcpy (dst, ring_buffer + offset, length); + res = length; + } + GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res); if (G_UNLIKELY (res < length)) { + if (!QUEUE_IS_USING_TEMP_FILE (queue)) + goto could_not_read; /* check for errors or EOF */ if (ferror (queue->temp_file)) goto could_not_read; @@ -1307,8 +1321,6 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue) } GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template); - init_ranges (queue); - return TRUE; /* ERRORS */ @@ -1370,15 +1382,15 @@ gst_queue2_flush_temp_file (GstQueue2 * queue) GST_DEBUG_OBJECT (queue, "flushing temp file"); queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file); - - init_ranges (queue); } static void gst_queue2_locked_flush (GstQueue2 * queue) { if (!QUEUE_IS_USING_QUEUE (queue)) { - gst_queue2_flush_temp_file (queue); + if (QUEUE_IS_USING_TEMP_FILE (queue)) + gst_queue2_flush_temp_file (queue); + init_ranges (queue); } else { while (!g_queue_is_empty (queue->queue)) { GstMiniObject *data = g_queue_pop_head (queue->queue); @@ -1436,9 +1448,9 @@ out_flushing: } static gboolean -gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) +gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer) { - guint8 *data; + guint8 *data, *ring_buffer; guint size, rb_size; guint64 writing_pos, new_writing_pos, max_reading_pos; gint64 space; @@ -1449,6 +1461,7 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) else writing_pos = queue->current->writing_pos; max_reading_pos = queue->current->max_reading_pos; + ring_buffer = queue->ring_buffer; rb_size = queue->ring_buffer_max_size; size = GST_BUFFER_SIZE (buffer); @@ -1475,8 +1488,6 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) * or all of) the buffer */ new_writing_pos = (writing_pos + to_write) % rb_size; - debug_ranges (queue); - prev = NULL; range = queue->ranges; @@ -1559,21 +1570,24 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) range_to_destroy = NULL; } } - debug_ranges (queue); - } else { space = to_write = size; new_writing_pos = writing_pos + to_write; } - if (FSEEK_FILE (queue->temp_file, writing_pos)) + if (QUEUE_IS_USING_TEMP_FILE (queue) + && FSEEK_FILE (queue->temp_file, writing_pos)) goto seek_failed; if (new_writing_pos > writing_pos) { GST_INFO_OBJECT (queue, "writing %u bytes", to_write); /* either not using ring buffer or no wrapping, just write */ - if (fwrite (data, to_write, 1, queue->temp_file) != 1) - goto handle_error; + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + if (fwrite (data, to_write, 1, queue->temp_file) != 1) + goto handle_error; + } else { + memcpy (ring_buffer + writing_pos, data, to_write); + } if (!QUEUE_IS_USING_RING_BUFFER (queue)) { /* try to merge with next range */ @@ -1607,17 +1621,25 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) if (block_one > 0) { GST_INFO_OBJECT (queue, "writing %u bytes", block_one); /* write data to end of ring buffer */ - if (fwrite (data, block_one, 1, queue->temp_file) != 1) - goto handle_error; + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + if (fwrite (data, block_one, 1, queue->temp_file) != 1) + goto handle_error; + } else { + memcpy (ring_buffer + writing_pos, data, block_one); + } } - if (FSEEK_FILE (queue->temp_file, 0)) + if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0)) goto seek_failed; if (block_two > 0) { GST_INFO_OBJECT (queue, "writing %u bytes", block_two); - if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1) - goto handle_error; + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1) + goto handle_error; + } else { + memcpy (ring_buffer, data + block_one, block_two); + } } } @@ -1698,7 +1720,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) update_in_rates (queue); /* FIXME - check return value? */ - gst_queue2_write_buffer_to_file (queue, buffer); + gst_queue2_create_write (queue, buffer); } else if (GST_IS_EVENT (item)) { GstEvent *event; @@ -2598,8 +2620,15 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active) if (active) { if (!QUEUE_IS_USING_QUEUE (queue)) { - /* open the temp file now */ - result = gst_queue2_open_temp_location_file (queue); + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + /* open the temp file now */ + result = gst_queue2_open_temp_location_file (queue); + } else if (!queue->ring_buffer) { + queue->ring_buffer = malloc (queue->ring_buffer_max_size); + result = !!queue->ring_buffer; + } + + init_ranges (queue); GST_QUEUE2_MUTEX_LOCK (queue); GST_DEBUG_OBJECT (queue, "activating pull mode"); @@ -2646,8 +2675,18 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_READY_TO_PAUSED: if (!QUEUE_IS_USING_QUEUE (queue)) { - if (!gst_queue2_open_temp_location_file (queue)) - ret = GST_STATE_CHANGE_FAILURE; + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!gst_queue2_open_temp_location_file (queue)) + ret = GST_STATE_CHANGE_FAILURE; + } else { + if (queue->ring_buffer) { + free (queue->ring_buffer); + queue->ring_buffer = NULL; + } + if (!(queue->ring_buffer = malloc (queue->ring_buffer_max_size))) + ret = GST_STATE_CHANGE_FAILURE; + } + init_ranges (queue); } queue->segment_event_received = FALSE; queue->starting_segment = NULL; @@ -2670,8 +2709,14 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: - if (!QUEUE_IS_USING_QUEUE (queue)) - gst_queue2_close_temp_location_file (queue); + if (!QUEUE_IS_USING_QUEUE (queue)) { + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + gst_queue2_close_temp_location_file (queue); + } else if (queue->ring_buffer) { + free (queue->ring_buffer); + queue->ring_buffer = NULL; + } + } if (queue->starting_segment != NULL) { gst_event_unref (queue->starting_segment); queue->starting_segment = NULL; diff --git a/plugins/elements/gstqueue2.h b/plugins/elements/gstqueue2.h index 2b036c5635..6898ba85c7 100644 --- a/plugins/elements/gstqueue2.h +++ b/plugins/elements/gstqueue2.h @@ -137,6 +137,7 @@ struct _GstQueue2 gboolean use_ring_buffer; guint64 ring_buffer_max_size; + guint8 * ring_buffer; }; struct _GstQueue2Class |