summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Swain <robert.swain@collabora.co.uk>2010-06-18 17:43:40 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2010-08-13 16:38:57 +0200
commit9df54eb4ffcc3c06f1653b816e8953046d1fc0d6 (patch)
tree7f20b6ddea2c62a5f97d40137057cbdc6e8108b2
parente29cca10a43d6883253ebf4f586d46a9a925afaa (diff)
queue2: extend ring buffer to support ram mode
-rw-r--r--plugins/elements/gstqueue2.c101
-rw-r--r--plugins/elements/gstqueue2.h1
2 files changed, 74 insertions, 28 deletions
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c
index 0b096c256e..0a86522e96 100644
--- a/plugins/elements/gstqueue2.c
+++ b/plugins/elements/gstqueue2.c
@@ -64,6 +64,8 @@
#include "gst/gst-i18n-lib.h"
+#include <string.h>
+
#ifdef G_OS_WIN32
#include <io.h> /* lseek, open, close, read */
#undef lseek
@@ -450,6 +452,7 @@ gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
queue->temp_remove = DEFAULT_TEMP_REMOVE;
queue->use_ring_buffer = FALSE;
+ queue->ring_buffer = NULL;
queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
GST_DEBUG_OBJECT (queue,
@@ -1059,18 +1062,29 @@ static gint64
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
guint8 * dst)
{
+ guint8 *ring_buffer;
size_t res;
- if (FSEEK_FILE (queue->temp_file, offset))
+ ring_buffer = queue->ring_buffer;
+
+ if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
goto seek_failed;
/* this should not block */
GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
length, offset);
- res = fread (dst, 1, length, queue->temp_file);
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ res = fread (dst, 1, length, queue->temp_file);
+ } else {
+ memcpy (dst, ring_buffer + offset, length);
+ res = length;
+ }
+
GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
if (G_UNLIKELY (res < length)) {
+ if (!QUEUE_IS_USING_TEMP_FILE (queue))
+ goto could_not_read;
/* check for errors or EOF */
if (ferror (queue->temp_file))
goto could_not_read;
@@ -1307,8 +1321,6 @@ gst_queue2_open_temp_location_file (GstQueue2 * queue)
}
GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
- init_ranges (queue);
-
return TRUE;
/* ERRORS */
@@ -1370,15 +1382,15 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
GST_DEBUG_OBJECT (queue, "flushing temp file");
queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
-
- init_ranges (queue);
}
static void
gst_queue2_locked_flush (GstQueue2 * queue)
{
if (!QUEUE_IS_USING_QUEUE (queue)) {
- gst_queue2_flush_temp_file (queue);
+ if (QUEUE_IS_USING_TEMP_FILE (queue))
+ gst_queue2_flush_temp_file (queue);
+ init_ranges (queue);
} else {
while (!g_queue_is_empty (queue->queue)) {
GstMiniObject *data = g_queue_pop_head (queue->queue);
@@ -1436,9 +1448,9 @@ out_flushing:
}
static gboolean
-gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
+gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
{
- guint8 *data;
+ guint8 *data, *ring_buffer;
guint size, rb_size;
guint64 writing_pos, new_writing_pos, max_reading_pos;
gint64 space;
@@ -1449,6 +1461,7 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
else
writing_pos = queue->current->writing_pos;
max_reading_pos = queue->current->max_reading_pos;
+ ring_buffer = queue->ring_buffer;
rb_size = queue->ring_buffer_max_size;
size = GST_BUFFER_SIZE (buffer);
@@ -1475,8 +1488,6 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
* or all of) the buffer */
new_writing_pos = (writing_pos + to_write) % rb_size;
- debug_ranges (queue);
-
prev = NULL;
range = queue->ranges;
@@ -1559,21 +1570,24 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
range_to_destroy = NULL;
}
}
- debug_ranges (queue);
-
} else {
space = to_write = size;
new_writing_pos = writing_pos + to_write;
}
- if (FSEEK_FILE (queue->temp_file, writing_pos))
+ if (QUEUE_IS_USING_TEMP_FILE (queue)
+ && FSEEK_FILE (queue->temp_file, writing_pos))
goto seek_failed;
if (new_writing_pos > writing_pos) {
GST_INFO_OBJECT (queue, "writing %u bytes", to_write);
/* either not using ring buffer or no wrapping, just write */
- if (fwrite (data, to_write, 1, queue->temp_file) != 1)
- goto handle_error;
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (fwrite (data, to_write, 1, queue->temp_file) != 1)
+ goto handle_error;
+ } else {
+ memcpy (ring_buffer + writing_pos, data, to_write);
+ }
if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
/* try to merge with next range */
@@ -1607,17 +1621,25 @@ gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
if (block_one > 0) {
GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
/* write data to end of ring buffer */
- if (fwrite (data, block_one, 1, queue->temp_file) != 1)
- goto handle_error;
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (fwrite (data, block_one, 1, queue->temp_file) != 1)
+ goto handle_error;
+ } else {
+ memcpy (ring_buffer + writing_pos, data, block_one);
+ }
}
- if (FSEEK_FILE (queue->temp_file, 0))
+ if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
goto seek_failed;
if (block_two > 0) {
GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
- if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
- goto handle_error;
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
+ goto handle_error;
+ } else {
+ memcpy (ring_buffer, data + block_one, block_two);
+ }
}
}
@@ -1698,7 +1720,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
update_in_rates (queue);
/* FIXME - check return value? */
- gst_queue2_write_buffer_to_file (queue, buffer);
+ gst_queue2_create_write (queue, buffer);
} else if (GST_IS_EVENT (item)) {
GstEvent *event;
@@ -2598,8 +2620,15 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
if (active) {
if (!QUEUE_IS_USING_QUEUE (queue)) {
- /* open the temp file now */
- result = gst_queue2_open_temp_location_file (queue);
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ /* open the temp file now */
+ result = gst_queue2_open_temp_location_file (queue);
+ } else if (!queue->ring_buffer) {
+ queue->ring_buffer = malloc (queue->ring_buffer_max_size);
+ result = !!queue->ring_buffer;
+ }
+
+ init_ranges (queue);
GST_QUEUE2_MUTEX_LOCK (queue);
GST_DEBUG_OBJECT (queue, "activating pull mode");
@@ -2646,8 +2675,18 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
if (!QUEUE_IS_USING_QUEUE (queue)) {
- if (!gst_queue2_open_temp_location_file (queue))
- ret = GST_STATE_CHANGE_FAILURE;
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!gst_queue2_open_temp_location_file (queue))
+ ret = GST_STATE_CHANGE_FAILURE;
+ } else {
+ if (queue->ring_buffer) {
+ free (queue->ring_buffer);
+ queue->ring_buffer = NULL;
+ }
+ if (!(queue->ring_buffer = malloc (queue->ring_buffer_max_size)))
+ ret = GST_STATE_CHANGE_FAILURE;
+ }
+ init_ranges (queue);
}
queue->segment_event_received = FALSE;
queue->starting_segment = NULL;
@@ -2670,8 +2709,14 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
- if (!QUEUE_IS_USING_QUEUE (queue))
- gst_queue2_close_temp_location_file (queue);
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ gst_queue2_close_temp_location_file (queue);
+ } else if (queue->ring_buffer) {
+ free (queue->ring_buffer);
+ queue->ring_buffer = NULL;
+ }
+ }
if (queue->starting_segment != NULL) {
gst_event_unref (queue->starting_segment);
queue->starting_segment = NULL;
diff --git a/plugins/elements/gstqueue2.h b/plugins/elements/gstqueue2.h
index 2b036c5635..6898ba85c7 100644
--- a/plugins/elements/gstqueue2.h
+++ b/plugins/elements/gstqueue2.h
@@ -137,6 +137,7 @@ struct _GstQueue2
gboolean use_ring_buffer;
guint64 ring_buffer_max_size;
+ guint8 * ring_buffer;
};
struct _GstQueue2Class