summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/elements/gstqueue2.c333
1 files changed, 152 insertions, 181 deletions
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c
index 7dfcb5fb49..0b096c256e 100644
--- a/plugins/elements/gstqueue2.c
+++ b/plugins/elements/gstqueue2.c
@@ -366,8 +366,8 @@ gst_queue2_class_init (GstQueue2Class * klass)
g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
g_param_spec_uint64 ("ring-buffer-max-size",
"Max. ring buffer size (bytes)",
- "Max. amount of data in the ring buffer (bytes, 0=unlimited)",
- DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
+ "Max. amount of data in the ring buffer (bytes, 0 = disabled",
+ 0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/* set several parent class virtual functions */
@@ -951,89 +951,6 @@ update_out_rates (GstQueue2 * queue)
queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
}
-#ifdef HAVE_FSEEKO
-#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
-#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
-#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
-#else
-#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
-#endif
-
-static gboolean
-gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
-{
- guint size;
- guint8 *data;
- guint64 writing_pos, max_reading_pos;
- GstQueue2Range *next;
-
- writing_pos = queue->current->writing_pos;
- max_reading_pos = queue->current->max_reading_pos;
-
- if (FSEEK_FILE (queue->temp_file, writing_pos))
- goto seek_failed;
-
- data = GST_BUFFER_DATA (buffer);
- size = GST_BUFFER_SIZE (buffer);
-
- if (fwrite (data, size, 1, queue->temp_file) != 1)
- goto handle_error;
-
- writing_pos += size;
-
- GST_INFO_OBJECT (queue,
- "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT,
- writing_pos, max_reading_pos);
-
- /* try to merge with next range */
- while ((next = queue->current->next)) {
- GST_INFO_OBJECT (queue,
- "checking merge with next range %" G_GUINT64_FORMAT " < %"
- G_GUINT64_FORMAT, writing_pos, next->offset);
- if (writing_pos < next->offset)
- break;
-
- GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
- next->writing_pos);
-
- /* remove the group, we could choose to not read the data in this range
- * again. This would involve us doing a seek to the current writing position
- * in the range. FIXME, It would probably make sense to do a seek when there
- * is a lot of data in the range we merged with to avoid reading it all
- * again. */
- queue->current->next = next->next;
- g_slice_free (GstQueue2Range, next);
-
- debug_ranges (queue);
- }
- queue->current->writing_pos = writing_pos;
- update_cur_level (queue, queue->current);
-
- return TRUE;
-
- /* ERRORS */
-seek_failed:
- {
- GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
- return FALSE;
- }
-handle_error:
- {
- switch (errno) {
- case ENOSPC:{
- GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
- break;
- }
- default:{
- GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
- (_("Error while writing to download file.")),
- ("%s", g_strerror (errno)));
- }
- }
- return FALSE;
- }
-}
-
static void
update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
{
@@ -1087,9 +1004,6 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
perform_seek_to_offset (queue, range->writing_pos);
}
- /* update the current reading position in the range */
- update_cur_pos (queue, queue->current, offset + length);
-
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
@@ -1133,6 +1047,14 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
return FALSE;
}
+#ifdef HAVE_FSEEKO
+#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
+#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
+#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
+#else
+#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
+#endif
+
static gint64
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
guint8 * dst)
@@ -1185,6 +1107,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
guint block_length, remaining, read_length;
gint64 read_return;
guint64 rb_size;
+ guint64 rpos;
/* allocate the output buffer of the requested size */
buf = gst_buffer_new_and_alloc (length);
@@ -1193,20 +1116,21 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
offset);
+ rpos = queue->current->reading_pos = offset;
rb_size = queue->ring_buffer_max_size;
remaining = length;
while (remaining > 0) {
/* configure how much/whether to read */
- if (!gst_queue2_have_data (queue, offset, length)) {
+ if (!gst_queue2_have_data (queue, rpos, remaining)) {
read_length = 0;
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
guint64 level;
/* calculate how far away the offset is */
- if (queue->current->writing_pos > offset)
- level = queue->current->writing_pos - offset;
+ if (queue->current->writing_pos > rpos)
+ level = queue->current->writing_pos - rpos;
else
level = 0;
@@ -1224,7 +1148,14 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
read_length = rb_size;
}
}
+
if (read_length == 0) {
+ if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+ /* protect cached data (data between offset and max_reading_pos)
+ * and update current level */
+ queue->current->max_reading_pos = rpos;
+ update_cur_level (queue, queue->current);
+ }
GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
continue;
}
@@ -1236,7 +1167,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
/* congfigure how much and from where to read */
if (QUEUE_IS_USING_RING_BUFFER (queue)) {
file_offset =
- (queue->current->rb_offset + (offset -
+ (queue->current->rb_offset + (rpos -
queue->current->offset)) % rb_size;
if (file_offset + read_length > rb_size) {
block_length = rb_size - file_offset;
@@ -1244,7 +1175,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
block_length = read_length;
}
} else {
- file_offset = offset;
+ file_offset = rpos;
block_length = read_length;
}
@@ -1265,7 +1196,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
block_length = read_length;
remaining -= read_return;
- queue->current->reading_pos += read_return;
+ rpos = (queue->current->reading_pos += read_return);
+ update_cur_pos (queue, queue->current, queue->current->reading_pos);
}
GST_QUEUE2_SIGNAL_DEL (queue);
GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
@@ -1504,15 +1436,19 @@ out_flushing:
}
static gboolean
-gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
+gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer)
{
guint8 *data;
guint size, rb_size;
- guint64 writing_pos, new_writing_pos;
+ guint64 writing_pos, new_writing_pos, max_reading_pos;
gint64 space;
- GstQueue2Range *range, *prev;
+ GstQueue2Range *range, *prev, *next;
- writing_pos = queue->current->rb_writing_pos;
+ if (QUEUE_IS_USING_RING_BUFFER (queue))
+ writing_pos = queue->current->rb_writing_pos;
+ else
+ writing_pos = queue->current->writing_pos;
+ max_reading_pos = queue->current->max_reading_pos;
rb_size = queue->ring_buffer_max_size;
size = GST_BUFFER_SIZE (buffer);
@@ -1521,46 +1457,77 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
while (size > 0) {
guint to_write;
- /* calculate the space in the ring buffer not used by data from the
- * current range */
- while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
- /* wait until there is some free space */
- GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
- }
- /* get the amount of space we have */
- space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
-
- /* calculate if we need to split or if we can write the entire buffer now */
- to_write = MIN (size, space);
-
- /* the writing position in the ring buffer after writing (part or all of)
- * the buffer */
- new_writing_pos = (writing_pos + to_write) % rb_size;
-
- prev = NULL;
- range = queue->ranges;
-
- /* if we need to overwrite data in the ring buffer, we need to update the
- * ranges
- * warning: this code is complicated and includes some simplifications -
- * pen, paper and diagrams for the cases recommended! */
- while (range) {
- guint64 range_data_start, range_data_end;
- GstQueue2Range *range_to_destroy = NULL;
-
- /* we don't edit the current range here */
- if (range == queue->current)
- goto next_range;
+ if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+ /* calculate the space in the ring buffer not used by data from
+ * the current range */
+ while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
+ /* wait until there is some free space */
+ GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
+ }
+ /* get the amount of space we have */
+ space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
+
+ /* calculate if we need to split or if we can write the entire
+ * buffer now */
+ to_write = MIN (size, space);
+
+ /* the writing position in the ring buffer after writing (part
+ * or all of) the buffer */
+ new_writing_pos = (writing_pos + to_write) % rb_size;
+
+ debug_ranges (queue);
+
+ prev = NULL;
+ range = queue->ranges;
+
+ /* if we need to overwrite data in the ring buffer, we need to
+ * update the ranges
+ *
+ * warning: this code is complicated and includes some
+ * simplifications - pen, paper and diagrams for the cases
+ * recommended! */
+ while (range) {
+ guint64 range_data_start, range_data_end;
+ GstQueue2Range *range_to_destroy = NULL;
+
+ /* we don't edit the current range here */
+ if (range == queue->current)
+ goto next_range;
- range_data_start = range->rb_offset;
- range_data_end = range->rb_writing_pos;
+ range_data_start = range->rb_offset;
+ range_data_end = range->rb_writing_pos;
+
+ if (range_data_end > range_data_start) {
+ if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
+ goto next_range;
+
+ if (new_writing_pos > range_data_start) {
+ if (new_writing_pos >= range_data_end) {
+ GST_DEBUG_OBJECT (queue,
+ "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
+ G_GUINT64_FORMAT, range->offset, range->writing_pos);
+ /* remove range */
+ range_to_destroy = range;
+ if (prev)
+ prev->next = range->next;
+ } else {
+ GST_DEBUG_OBJECT (queue,
+ "advancing offsets from %" G_GUINT64_FORMAT " (%"
+ G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
+ G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
+ range->offset + new_writing_pos - range_data_start,
+ new_writing_pos);
+ range->offset += (new_writing_pos - range_data_start);
+ range->rb_offset = new_writing_pos;
+ }
+ }
+ } else {
+ guint64 new_wpos_virt = writing_pos + to_write;
- if (range_data_end > range_data_start) {
- if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
- goto next_range;
+ if (new_wpos_virt <= range_data_start)
+ goto next_range;
- if (new_writing_pos > range_data_start) {
- if (new_writing_pos >= range_data_end) {
+ if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
GST_DEBUG_OBJECT (queue,
"Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
G_GUINT64_FORMAT, range->offset, range->writing_pos);
@@ -1575,47 +1542,28 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
range->offset + new_writing_pos - range_data_start,
new_writing_pos);
- range->offset += (new_writing_pos - range_data_start);
+ range->offset += (new_wpos_virt - range_data_start);
range->rb_offset = new_writing_pos;
}
}
- } else {
- guint64 new_wpos_virt = writing_pos + to_write;
- if (new_wpos_virt <= range_data_start)
- goto next_range;
+ next_range:
+ if (!range_to_destroy)
+ prev = range;
- if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
- GST_DEBUG_OBJECT (queue,
- "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
- G_GUINT64_FORMAT, range->offset, range->writing_pos);
- /* remove range */
- range_to_destroy = range;
- if (prev)
- prev->next = range->next;
- } else {
- GST_DEBUG_OBJECT (queue,
- "advancing offsets from %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT
- ") to %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT ")",
- range->offset, range->rb_offset,
- range->offset + new_writing_pos - range_data_start,
- new_writing_pos);
- range->offset += (new_wpos_virt - range_data_start);
- range->rb_offset = new_writing_pos;
+ range = range->next;
+ if (range_to_destroy) {
+ if (range_to_destroy == queue->ranges)
+ queue->ranges = range;
+ g_slice_free (GstQueue2Range, range_to_destroy);
+ range_to_destroy = NULL;
}
}
+ debug_ranges (queue);
- next_range:
- if (!range_to_destroy)
- prev = range;
-
- range = range->next;
- if (range_to_destroy) {
- if (range_to_destroy == queue->ranges)
- queue->ranges = range;
- g_slice_free (GstQueue2Range, range_to_destroy);
- range_to_destroy = NULL;
- }
+ } else {
+ space = to_write = size;
+ new_writing_pos = writing_pos + to_write;
}
if (FSEEK_FILE (queue->temp_file, writing_pos))
@@ -1623,9 +1571,32 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
if (new_writing_pos > writing_pos) {
GST_INFO_OBJECT (queue, "writing %u bytes", to_write);
- /* no wrapping, just write */
+ /* either not using ring buffer or no wrapping, just write */
if (fwrite (data, to_write, 1, queue->temp_file) != 1)
goto handle_error;
+
+ if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
+ /* try to merge with next range */
+ while ((next = queue->current->next)) {
+ GST_INFO_OBJECT (queue,
+ "checking merge with next range %" G_GUINT64_FORMAT " < %"
+ G_GUINT64_FORMAT, new_writing_pos, next->offset);
+ if (new_writing_pos < next->offset)
+ break;
+
+ GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
+ next->writing_pos);
+ /* we will run over the offset of the next group */
+ queue->current->writing_pos = new_writing_pos = next->writing_pos;
+
+ /* remove the group */
+ queue->current->next = next->next;
+ g_slice_free (GstQueue2Range, next);
+
+ debug_ranges (queue);
+ }
+ goto update_and_signal;
+ }
} else {
/* wrapping */
guint block_one, block_two;
@@ -1650,17 +1621,22 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
}
}
+ update_and_signal:
/* update the writing positions */
size -= to_write;
- data += to_write;
- queue->current->writing_pos += to_write;
- queue->current->rb_writing_pos = writing_pos = new_writing_pos;
- update_cur_level (queue, queue->current);
-
GST_INFO_OBJECT (queue,
"wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
to_write, writing_pos, size);
+ if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+ data += to_write;
+ queue->current->writing_pos += to_write;
+ queue->current->rb_writing_pos = writing_pos = new_writing_pos;
+ } else {
+ queue->current->writing_pos = writing_pos = new_writing_pos;
+ }
+ update_cur_level (queue, queue->current);
+
GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)",
queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
@@ -1721,13 +1697,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
/* update the byterate stats */
update_in_rates (queue);
- /* FIXME - check return values? */
- if (QUEUE_IS_USING_RING_BUFFER (queue)) {
- gst_queue2_write_buffer_to_ring_buffer (queue, buffer);
- } else if (QUEUE_IS_USING_TEMP_FILE (queue)) {
- gst_queue2_write_buffer_to_file (queue, buffer);
- }
-
+ /* FIXME - check return value? */
+ gst_queue2_write_buffer_to_file (queue, buffer);
} else if (GST_IS_EVENT (item)) {
GstEvent *event;