diff options
author | Thiago Sousa Santos <thiagossantos@gmail.com> | 2007-06-05 16:14:23 +0000 |
---|---|---|
committer | Sebastian Dröge <sebastian.droege@collabora.co.uk> | 2009-10-29 11:17:12 +0100 |
commit | 14d39928f5f32856c22e9163f17026dc0385853c (patch) | |
tree | 8e3de4eae746e2069fe52a87473fb3254e14dd56 /gst | |
parent | b2486d65c64bf7320aeb0e859bd7f95c7e40f57b (diff) |
gst/playback/gstqueue2.c: Add support for filebased buffering. Fixes #441264.
Original commit message from CVS:
Based on patch by: Thiago Sousa Santos <thiagossantos at gmail dot com>
* gst/playback/gstqueue2.c: (gst_queue_class_init),
(gst_queue_init), (gst_queue_finalize),
(gst_queue_write_buffer_to_file), (gst_queue_have_data),
(gst_queue_create_read), (gst_queue_read_item_from_file),
(gst_queue_open_temp_location_file),
(gst_queue_close_temp_location_file), (gst_queue_locked_flush),
(gst_queue_locked_enqueue), (gst_queue_locked_dequeue),
(gst_queue_is_empty), (gst_queue_is_filled),
(gst_queue_change_state), (gst_queue_set_temp_location),
(gst_queue_set_property):
Add support for filebased buffering. Fixes #441264.
Diffstat (limited to 'gst')
-rw-r--r-- | gst/playback/gstqueue2.c | 329 |
1 files changed, 314 insertions, 15 deletions
diff --git a/gst/playback/gstqueue2.c b/gst/playback/gstqueue2.c index 23663f1ad0..10949eeec5 100644 --- a/gst/playback/gstqueue2.c +++ b/gst/playback/gstqueue2.c @@ -1,9 +1,8 @@ /* GStreamer * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> - * 2000 Wim Taymans <wtay@chello.be> * 2003 Colin Walters <cwalters@gnome.org> - * 2005 Wim Taymans <wim@fluendo.com> - * 2007 Wim Taymans <wim@fluendo.com> + * 2000,2005,2007 Wim Taymans <wim@fluendo.com> + * 2007 Thiago Sousa Santos <thiagossantos at gmail dot com> * * gstqueue2.c: * @@ -41,12 +40,20 @@ * * The default queue size limits are 100 buffers, 2MB of data, or * two seconds worth of data, whichever is reached first. + * + * If you set temp-location, the element will buffer data on the file + * specified by it. By using this, it will buffer the entire + * stream data on the file independently of the queue size limits, they + * will only be used for buffering statistics. * */ + #ifdef HAVE_CONFIG_H #include "config.h" #endif +#include <glib/gstdio.h> + #include <gst/gst.h> #include <gst/gst-i18n-plugin.h> @@ -83,6 +90,10 @@ enum #define DEFAULT_LOW_PERCENT 10 #define DEFAULT_HIGH_PERCENT 99 +/* other defines */ +#define DEFAULT_BUFFER_SIZE 4096 +#define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL) + enum { PROP_0, @@ -176,6 +187,13 @@ struct _GstQueue /* temp location stuff */ gchar *temp_location; + FILE *temp_file; + guint64 writing_pos; + guint64 reading_pos; + /* we need this to send the first new segment event of the stream + * because we can't save it on the file */ + gboolean segment_event_received; + GstEvent *starting_segment; }; struct _GstQueueClass @@ -195,7 +213,9 @@ struct _GstQueueClass queue->max_level.bytes, \ queue->cur_level.time, \ queue->max_level.time, \ - queue->queue->length) + QUEUE_IS_USING_TEMP_FILE(queue) ? \ + queue->writing_pos - queue->reading_pos : \ + queue->queue->length) #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ g_mutex_lock (q->qlock); \ @@ -371,7 +391,7 @@ gst_queue_class_init (GstQueueClass * klass) g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION, g_param_spec_string ("temp-location", "Temporary File Location", - "Location of a temporary file to store data in (unused)", + "Location of a temporary file to store data in", NULL, G_PARAM_READWRITE)); gst_element_class_add_pad_template (gstelement_class, @@ -443,6 +463,10 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class) queue->item_del = g_cond_new (); queue->queue = g_queue_new (); + /* tempfile related */ + queue->temp_location = NULL; + queue->temp_file = NULL; + GST_DEBUG_OBJECT (queue, "initialized queue's not_empty & not_full conditions"); } @@ -460,12 +484,17 @@ gst_queue_finalize (GObject * object) gst_mini_object_unref (data); } + g_queue_free (queue->queue); g_mutex_free (queue->qlock); g_cond_free (queue->item_add); g_cond_free (queue->item_del); g_timer_destroy (queue->timer); + /* temp_file path cleanup */ + if (queue->temp_location != NULL) + g_free (queue->temp_location); + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -720,19 +749,208 @@ update_rates (GstQueue * queue) } static void +gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer) +{ + guint size; + guint8 *data; + + fseek (queue->temp_file, queue->writing_pos, SEEK_SET); + + data = GST_BUFFER_DATA (buffer); + size = GST_BUFFER_SIZE (buffer); + + fwrite (data, 1, size, queue->temp_file); + queue->writing_pos += size; +} + +/* see if there is enough data in the file to read a full buffer */ +static gboolean +gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) +{ + GST_DEBUG_OBJECT (queue, + "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset, + length, queue->writing_pos); + if (queue->is_eos) + return TRUE; + + if (offset + length < queue->writing_pos) + return TRUE; + + return FALSE; +} + +static GstFlowReturn +gst_queue_create_read (GstQueue * queue, guint64 offset, guint length, + GstBuffer ** buffer) +{ + size_t res; + GstBuffer *buf; + off_t sres; + + /* check if we have enough data at @offset. If there is not enough data, we + * block and wait. */ + while (!gst_queue_have_data (queue, offset, length)) { + GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing); + } + + sres = fseeko (queue->temp_file, offset, SEEK_SET); + if (G_UNLIKELY (sres < 0)) + goto seek_failed; + + buf = gst_buffer_new_and_alloc (length); + + /* this should not block */ + GST_LOG_OBJECT (queue, "Reading %d bytes", length); + res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file); + GST_LOG_OBJECT (queue, "read %d bytes", res); + + if (G_UNLIKELY (res == 0)) { + /* check for errors or EOF */ + if (ferror (queue->temp_file)) + goto could_not_read; + if (feof (queue->temp_file) && length > 0) + goto eos; + } + + length = res; + + GST_BUFFER_SIZE (buf) = length; + GST_BUFFER_OFFSET (buf) = offset; + GST_BUFFER_OFFSET_END (buf) = offset + length; + + *buffer = buf; + + queue->reading_pos = offset + length; + + return GST_FLOW_OK; + + /* ERRORS */ +out_flushing: + { + GST_DEBUG_OBJECT (queue, "we are flushing"); + return GST_FLOW_WRONG_STATE; + } +seek_failed: + { + GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + return GST_FLOW_ERROR; + } +could_not_read: + { + GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } +eos: + { + GST_DEBUG ("non-regular file hits EOS"); + gst_buffer_unref (buf); + return GST_FLOW_UNEXPECTED; + } +} + +static GstMiniObject * +gst_queue_read_item_from_file (GstQueue * queue) +{ + GstMiniObject *item; + + if (queue->starting_segment != NULL) { + item = GST_MINI_OBJECT_CAST (queue->starting_segment); + queue->starting_segment = NULL; + } else { + GstFlowReturn ret; + GstBuffer *buffer; + + ret = + gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE, + &buffer); + switch (ret) { + case GST_FLOW_OK: + item = GST_MINI_OBJECT_CAST (buffer); + break; + case GST_FLOW_UNEXPECTED: + item = GST_MINI_OBJECT_CAST (gst_event_new_eos ()); + break; + default: + item = NULL; + break; + } + } + return item; +} + +static gboolean +gst_queue_open_temp_location_file (GstQueue * queue) +{ + /* nothing to do */ + if (queue->temp_location == NULL) + goto no_filename; + + /* open the file for update/writing */ + queue->temp_file = g_fopen (queue->temp_location, "wb+"); + /* error creating file */ + if (queue->temp_file == NULL) + goto open_failed; + + queue->writing_pos = 0; + queue->reading_pos = 0; + + return TRUE; + + /* ERRORS */ +no_filename: + { + GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, + (_("No file name specified.")), (NULL)); + return FALSE; + } +open_failed: + { + GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, + (_("Could not open file \"%s\" for reading."), queue->temp_location), + GST_ERROR_SYSTEM); + return FALSE; + } +} + +static void +gst_queue_close_temp_location_file (GstQueue * queue) +{ + /* nothing to do */ + if (queue->temp_file == NULL) + return; + + /* we don't remove the file so that the application can use it as a cache + * later on */ + fflush (queue->temp_file); + fclose (queue->temp_file); + remove (queue->temp_location); + queue->temp_file = NULL; +} + +static void gst_queue_locked_flush (GstQueue * queue) { - while (!g_queue_is_empty (queue->queue)) { - GstMiniObject *data = g_queue_pop_head (queue->queue); + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + gst_queue_close_temp_location_file (queue); + gst_queue_open_temp_location_file (queue); + } else { + while (!g_queue_is_empty (queue->queue)) { + GstMiniObject *data = g_queue_pop_head (queue->queue); - /* Then lose another reference because we are supposed to destroy that - data when flushing */ - gst_mini_object_unref (data); + /* Then lose another reference because we are supposed to destroy that + data when flushing */ + gst_mini_object_unref (data); + } } GST_QUEUE_CLEAR_LEVEL (queue->cur_level); gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); queue->is_eos = FALSE; + if (queue->starting_segment != NULL) + gst_event_unref (queue->starting_segment); + queue->starting_segment = NULL; + queue->segment_event_received = FALSE; /* we deleted a lot of something */ GST_QUEUE_SIGNAL_DEL (queue); @@ -760,6 +978,10 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item) /* update the buffering status */ update_buffering (queue); + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + gst_queue_write_buffer_to_file (queue, buffer); + } + } else if (GST_IS_EVENT (item)) { GstEvent *event; @@ -773,8 +995,19 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item) break; case GST_EVENT_NEWSEGMENT: apply_segment (queue, event, &queue->sink_segment); + /* This is our first new segment, we hold it + * as we can't save it on the temp file */ + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + if (queue->segment_event_received) + goto unexpected_event; + + queue->segment_event_received = TRUE; + queue->starting_segment = event; + } break; default: + if (QUEUE_IS_USING_TEMP_FILE (queue)) + goto unexpected_event; break; } } else { @@ -784,9 +1017,22 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item) item = NULL; } - if (item) + if (!QUEUE_IS_USING_TEMP_FILE (queue) && item) g_queue_push_tail (queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); + + return; + + /* ERRORS */ +unexpected_event: + { + g_warning + ("Unexpected event of kind %s can't be added in temp file of queue %s ", + gst_event_type_get_name (GST_EVENT_TYPE (item)), + GST_OBJECT_NAME (queue)); + gst_event_unref (GST_EVENT_CAST (item)); + return; + } } /* dequeue an item from the queue and update level stats */ @@ -795,7 +1041,11 @@ gst_queue_locked_dequeue (GstQueue * queue) { GstMiniObject *item; - item = g_queue_pop_head (queue->queue); + if (QUEUE_IS_USING_TEMP_FILE (queue)) + item = gst_queue_read_item_from_file (queue); + else + item = g_queue_pop_head (queue->queue); + if (item == NULL) goto no_item; @@ -927,8 +1177,12 @@ gst_queue_is_empty (GstQueue * queue) if (queue->is_eos) return FALSE; - if (queue->queue->length == 0) - return TRUE; + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + return queue->writing_pos == queue->reading_pos; + } else { + if (queue->queue->length == 0) + return TRUE; + } return FALSE; } @@ -942,6 +1196,10 @@ gst_queue_is_filled (GstQueue * queue) if (queue->is_eos) return TRUE; + /* if using file, we're never filled if we don't have EOS */ + if (QUEUE_IS_USING_TEMP_FILE (queue)) + return FALSE; + #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ (queue->cur_level.format) >= (queue->max_level.format)) @@ -1221,6 +1479,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: + if (QUEUE_IS_USING_TEMP_FILE (queue)) { + if (!gst_queue_open_temp_location_file (queue)) + ret = GST_STATE_CHANGE_FAILURE; + } + queue->segment_event_received = FALSE; + queue->starting_segment = NULL; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; @@ -1234,6 +1498,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: + if (QUEUE_IS_USING_TEMP_FILE (queue)) + gst_queue_close_temp_location_file (queue); + if (queue->starting_segment != NULL) { + gst_event_unref (queue->starting_segment); + queue->starting_segment = NULL; + } break; case GST_STATE_CHANGE_READY_TO_NULL: break; @@ -1257,6 +1527,35 @@ gst_queue_change_state (GstElement * element, GstStateChange transition) #define QUEUE_THRESHOLD_CHANGE(q)\ g_cond_signal (queue->item_add); +static gboolean +gst_queue_set_temp_location (GstQueue * queue, const gchar * location) +{ + GstState state; + + /* the element must be stopped in order to do this */ + GST_OBJECT_LOCK (queue); + state = GST_STATE (queue); + if (state != GST_STATE_READY && state != GST_STATE_NULL) + goto wrong_state; + GST_OBJECT_UNLOCK (queue); + + /* set new location */ + g_free (queue->temp_location); + queue->temp_location = g_strdup (location); + + g_object_notify (G_OBJECT (queue), "temp-location"); + + return TRUE; + +/* ERROR */ +wrong_state: + { + GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state"); + GST_OBJECT_UNLOCK (queue); + return FALSE; + } +} + static void gst_queue_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -1296,7 +1595,7 @@ gst_queue_set_property (GObject * object, queue->high_percent = g_value_get_int (value); break; case PROP_TEMP_LOCATION: - queue->temp_location = g_value_dup_string (value); + gst_queue_set_temp_location (queue, g_value_dup_string (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |