diff options
-rw-r--r-- | plugins/elements/gstqueue2.c | 333 |
1 files changed, 152 insertions, 181 deletions
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c index 7dfcb5fb49..0b096c256e 100644 --- a/plugins/elements/gstqueue2.c +++ b/plugins/elements/gstqueue2.c @@ -366,8 +366,8 @@ gst_queue2_class_init (GstQueue2Class * klass) g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE, g_param_spec_uint64 ("ring-buffer-max-size", "Max. ring buffer size (bytes)", - "Max. amount of data in the ring buffer (bytes, 0=unlimited)", - DEFAULT_BUFFER_SIZE, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE, + "Max. amount of data in the ring buffer (bytes, 0 = disabled", + 0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /* set several parent class virtual functions */ @@ -951,89 +951,6 @@ update_out_rates (GstQueue2 * queue) queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); } -#ifdef HAVE_FSEEKO -#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0) -#elif defined (G_OS_UNIX) || defined (G_OS_WIN32) -#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1) -#else -#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0) -#endif - -static gboolean -gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) -{ - guint size; - guint8 *data; - guint64 writing_pos, max_reading_pos; - GstQueue2Range *next; - - writing_pos = queue->current->writing_pos; - max_reading_pos = queue->current->max_reading_pos; - - if (FSEEK_FILE (queue->temp_file, writing_pos)) - goto seek_failed; - - data = GST_BUFFER_DATA (buffer); - size = GST_BUFFER_SIZE (buffer); - - if (fwrite (data, size, 1, queue->temp_file) != 1) - goto handle_error; - - writing_pos += size; - - GST_INFO_OBJECT (queue, - "writing %" G_GUINT64_FORMAT ", max_reading %" G_GUINT64_FORMAT, - writing_pos, max_reading_pos); - - /* try to merge with next range */ - while ((next = queue->current->next)) { - GST_INFO_OBJECT (queue, - "checking merge with next range %" G_GUINT64_FORMAT " < %" - G_GUINT64_FORMAT, writing_pos, next->offset); - if (writing_pos < next->offset) - break; - - GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT, - next->writing_pos); - - /* remove the group, we could choose to not read the data in this range - * again. This would involve us doing a seek to the current writing position - * in the range. FIXME, It would probably make sense to do a seek when there - * is a lot of data in the range we merged with to avoid reading it all - * again. */ - queue->current->next = next->next; - g_slice_free (GstQueue2Range, next); - - debug_ranges (queue); - } - queue->current->writing_pos = writing_pos; - update_cur_level (queue, queue->current); - - return TRUE; - - /* ERRORS */ -seek_failed: - { - GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } -handle_error: - { - switch (errno) { - case ENOSPC:{ - GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL)); - break; - } - default:{ - GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, - (_("Error while writing to download file.")), - ("%s", g_strerror (errno))); - } - } - return FALSE; - } -} - static void update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos) { @@ -1087,9 +1004,6 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) perform_seek_to_offset (queue, range->writing_pos); } - /* update the current reading position in the range */ - update_cur_pos (queue, queue->current, offset + length); - GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)", queue->cur_level.bytes, QUEUE_MAX_BYTES (queue)); @@ -1133,6 +1047,14 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length) return FALSE; } +#ifdef HAVE_FSEEKO +#define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0) +#elif defined (G_OS_UNIX) || defined (G_OS_WIN32) +#define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1) +#else +#define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0) +#endif + static gint64 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length, guint8 * dst) @@ -1185,6 +1107,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, guint block_length, remaining, read_length; gint64 read_return; guint64 rb_size; + guint64 rpos; /* allocate the output buffer of the requested size */ buf = gst_buffer_new_and_alloc (length); @@ -1193,20 +1116,21 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length, offset); + rpos = queue->current->reading_pos = offset; rb_size = queue->ring_buffer_max_size; remaining = length; while (remaining > 0) { /* configure how much/whether to read */ - if (!gst_queue2_have_data (queue, offset, length)) { + if (!gst_queue2_have_data (queue, rpos, remaining)) { read_length = 0; if (QUEUE_IS_USING_RING_BUFFER (queue)) { guint64 level; /* calculate how far away the offset is */ - if (queue->current->writing_pos > offset) - level = queue->current->writing_pos - offset; + if (queue->current->writing_pos > rpos) + level = queue->current->writing_pos - rpos; else level = 0; @@ -1224,7 +1148,14 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, read_length = rb_size; } } + if (read_length == 0) { + if (QUEUE_IS_USING_RING_BUFFER (queue)) { + /* protect cached data (data between offset and max_reading_pos) + * and update current level */ + queue->current->max_reading_pos = rpos; + update_cur_level (queue, queue->current); + } GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing); continue; } @@ -1236,7 +1167,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, /* congfigure how much and from where to read */ if (QUEUE_IS_USING_RING_BUFFER (queue)) { file_offset = - (queue->current->rb_offset + (offset - + (queue->current->rb_offset + (rpos - queue->current->offset)) % rb_size; if (file_offset + read_length > rb_size) { block_length = rb_size - file_offset; @@ -1244,7 +1175,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, block_length = read_length; } } else { - file_offset = offset; + file_offset = rpos; block_length = read_length; } @@ -1265,7 +1196,8 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length, block_length = read_length; remaining -= read_return; - queue->current->reading_pos += read_return; + rpos = (queue->current->reading_pos += read_return); + update_cur_pos (queue, queue->current, queue->current->reading_pos); } GST_QUEUE2_SIGNAL_DEL (queue); GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining); @@ -1504,15 +1436,19 @@ out_flushing: } static gboolean -gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) +gst_queue2_write_buffer_to_file (GstQueue2 * queue, GstBuffer * buffer) { guint8 *data; guint size, rb_size; - guint64 writing_pos, new_writing_pos; + guint64 writing_pos, new_writing_pos, max_reading_pos; gint64 space; - GstQueue2Range *range, *prev; + GstQueue2Range *range, *prev, *next; - writing_pos = queue->current->rb_writing_pos; + if (QUEUE_IS_USING_RING_BUFFER (queue)) + writing_pos = queue->current->rb_writing_pos; + else + writing_pos = queue->current->writing_pos; + max_reading_pos = queue->current->max_reading_pos; rb_size = queue->ring_buffer_max_size; size = GST_BUFFER_SIZE (buffer); @@ -1521,46 +1457,77 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) while (size > 0) { guint to_write; - /* calculate the space in the ring buffer not used by data from the - * current range */ - while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) { - /* wait until there is some free space */ - GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); - } - /* get the amount of space we have */ - space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes; - - /* calculate if we need to split or if we can write the entire buffer now */ - to_write = MIN (size, space); - - /* the writing position in the ring buffer after writing (part or all of) - * the buffer */ - new_writing_pos = (writing_pos + to_write) % rb_size; - - prev = NULL; - range = queue->ranges; - - /* if we need to overwrite data in the ring buffer, we need to update the - * ranges - * warning: this code is complicated and includes some simplifications - - * pen, paper and diagrams for the cases recommended! */ - while (range) { - guint64 range_data_start, range_data_end; - GstQueue2Range *range_to_destroy = NULL; - - /* we don't edit the current range here */ - if (range == queue->current) - goto next_range; + if (QUEUE_IS_USING_RING_BUFFER (queue)) { + /* calculate the space in the ring buffer not used by data from + * the current range */ + while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) { + /* wait until there is some free space */ + GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing); + } + /* get the amount of space we have */ + space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes; + + /* calculate if we need to split or if we can write the entire + * buffer now */ + to_write = MIN (size, space); + + /* the writing position in the ring buffer after writing (part + * or all of) the buffer */ + new_writing_pos = (writing_pos + to_write) % rb_size; + + debug_ranges (queue); + + prev = NULL; + range = queue->ranges; + + /* if we need to overwrite data in the ring buffer, we need to + * update the ranges + * + * warning: this code is complicated and includes some + * simplifications - pen, paper and diagrams for the cases + * recommended! */ + while (range) { + guint64 range_data_start, range_data_end; + GstQueue2Range *range_to_destroy = NULL; + + /* we don't edit the current range here */ + if (range == queue->current) + goto next_range; - range_data_start = range->rb_offset; - range_data_end = range->rb_writing_pos; + range_data_start = range->rb_offset; + range_data_end = range->rb_writing_pos; + + if (range_data_end > range_data_start) { + if (writing_pos >= range_data_end && new_writing_pos >= writing_pos) + goto next_range; + + if (new_writing_pos > range_data_start) { + if (new_writing_pos >= range_data_end) { + GST_DEBUG_OBJECT (queue, + "Removing range: offset %" G_GUINT64_FORMAT ", wpos %" + G_GUINT64_FORMAT, range->offset, range->writing_pos); + /* remove range */ + range_to_destroy = range; + if (prev) + prev->next = range->next; + } else { + GST_DEBUG_OBJECT (queue, + "advancing offsets from %" G_GUINT64_FORMAT " (%" + G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%" + G_GUINT64_FORMAT ")", range->offset, range->rb_offset, + range->offset + new_writing_pos - range_data_start, + new_writing_pos); + range->offset += (new_writing_pos - range_data_start); + range->rb_offset = new_writing_pos; + } + } + } else { + guint64 new_wpos_virt = writing_pos + to_write; - if (range_data_end > range_data_start) { - if (writing_pos >= range_data_end && new_writing_pos >= writing_pos) - goto next_range; + if (new_wpos_virt <= range_data_start) + goto next_range; - if (new_writing_pos > range_data_start) { - if (new_writing_pos >= range_data_end) { + if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) { GST_DEBUG_OBJECT (queue, "Removing range: offset %" G_GUINT64_FORMAT ", wpos %" G_GUINT64_FORMAT, range->offset, range->writing_pos); @@ -1575,47 +1542,28 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) G_GUINT64_FORMAT ")", range->offset, range->rb_offset, range->offset + new_writing_pos - range_data_start, new_writing_pos); - range->offset += (new_writing_pos - range_data_start); + range->offset += (new_wpos_virt - range_data_start); range->rb_offset = new_writing_pos; } } - } else { - guint64 new_wpos_virt = writing_pos + to_write; - if (new_wpos_virt <= range_data_start) - goto next_range; + next_range: + if (!range_to_destroy) + prev = range; - if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) { - GST_DEBUG_OBJECT (queue, - "Removing range: offset %" G_GUINT64_FORMAT ", wpos %" - G_GUINT64_FORMAT, range->offset, range->writing_pos); - /* remove range */ - range_to_destroy = range; - if (prev) - prev->next = range->next; - } else { - GST_DEBUG_OBJECT (queue, - "advancing offsets from %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT - ") to %" G_GUINT64_FORMAT " (%" G_GUINT64_FORMAT ")", - range->offset, range->rb_offset, - range->offset + new_writing_pos - range_data_start, - new_writing_pos); - range->offset += (new_wpos_virt - range_data_start); - range->rb_offset = new_writing_pos; + range = range->next; + if (range_to_destroy) { + if (range_to_destroy == queue->ranges) + queue->ranges = range; + g_slice_free (GstQueue2Range, range_to_destroy); + range_to_destroy = NULL; } } + debug_ranges (queue); - next_range: - if (!range_to_destroy) - prev = range; - - range = range->next; - if (range_to_destroy) { - if (range_to_destroy == queue->ranges) - queue->ranges = range; - g_slice_free (GstQueue2Range, range_to_destroy); - range_to_destroy = NULL; - } + } else { + space = to_write = size; + new_writing_pos = writing_pos + to_write; } if (FSEEK_FILE (queue->temp_file, writing_pos)) @@ -1623,9 +1571,32 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) if (new_writing_pos > writing_pos) { GST_INFO_OBJECT (queue, "writing %u bytes", to_write); - /* no wrapping, just write */ + /* either not using ring buffer or no wrapping, just write */ if (fwrite (data, to_write, 1, queue->temp_file) != 1) goto handle_error; + + if (!QUEUE_IS_USING_RING_BUFFER (queue)) { + /* try to merge with next range */ + while ((next = queue->current->next)) { + GST_INFO_OBJECT (queue, + "checking merge with next range %" G_GUINT64_FORMAT " < %" + G_GUINT64_FORMAT, new_writing_pos, next->offset); + if (new_writing_pos < next->offset) + break; + + GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT, + next->writing_pos); + /* we will run over the offset of the next group */ + queue->current->writing_pos = new_writing_pos = next->writing_pos; + + /* remove the group */ + queue->current->next = next->next; + g_slice_free (GstQueue2Range, next); + + debug_ranges (queue); + } + goto update_and_signal; + } } else { /* wrapping */ guint block_one, block_two; @@ -1650,17 +1621,22 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer) } } + update_and_signal: /* update the writing positions */ size -= to_write; - data += to_write; - queue->current->writing_pos += to_write; - queue->current->rb_writing_pos = writing_pos = new_writing_pos; - update_cur_level (queue, queue->current); - GST_INFO_OBJECT (queue, "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)", to_write, writing_pos, size); + if (QUEUE_IS_USING_RING_BUFFER (queue)) { + data += to_write; + queue->current->writing_pos += to_write; + queue->current->rb_writing_pos = writing_pos = new_writing_pos; + } else { + queue->current->writing_pos = writing_pos = new_writing_pos; + } + update_cur_level (queue, queue->current); + GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %u)", queue->cur_level.bytes, QUEUE_MAX_BYTES (queue)); @@ -1721,13 +1697,8 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item) /* update the byterate stats */ update_in_rates (queue); - /* FIXME - check return values? */ - if (QUEUE_IS_USING_RING_BUFFER (queue)) { - gst_queue2_write_buffer_to_ring_buffer (queue, buffer); - } else if (QUEUE_IS_USING_TEMP_FILE (queue)) { - gst_queue2_write_buffer_to_file (queue, buffer); - } - + /* FIXME - check return value? */ + gst_queue2_write_buffer_to_file (queue, buffer); } else if (GST_IS_EVENT (item)) { GstEvent *event; |