summaryrefslogtreecommitdiff
path: root/gst
diff options
context:
space:
mode:
authorThiago Sousa Santos <thiagossantos@gmail.com>2007-06-05 16:14:23 +0000
committerSebastian Dröge <sebastian.droege@collabora.co.uk>2009-10-29 11:17:12 +0100
commit14d39928f5f32856c22e9163f17026dc0385853c (patch)
tree8e3de4eae746e2069fe52a87473fb3254e14dd56 /gst
parentb2486d65c64bf7320aeb0e859bd7f95c7e40f57b (diff)
gst/playback/gstqueue2.c: Add support for filebased buffering. Fixes #441264.
Original commit message from CVS: Based on patch by: Thiago Sousa Santos <thiagossantos at gmail dot com> * gst/playback/gstqueue2.c: (gst_queue_class_init), (gst_queue_init), (gst_queue_finalize), (gst_queue_write_buffer_to_file), (gst_queue_have_data), (gst_queue_create_read), (gst_queue_read_item_from_file), (gst_queue_open_temp_location_file), (gst_queue_close_temp_location_file), (gst_queue_locked_flush), (gst_queue_locked_enqueue), (gst_queue_locked_dequeue), (gst_queue_is_empty), (gst_queue_is_filled), (gst_queue_change_state), (gst_queue_set_temp_location), (gst_queue_set_property): Add support for filebased buffering. Fixes #441264.
Diffstat (limited to 'gst')
-rw-r--r--gst/playback/gstqueue2.c329
1 files changed, 314 insertions, 15 deletions
diff --git a/gst/playback/gstqueue2.c b/gst/playback/gstqueue2.c
index 23663f1ad0..10949eeec5 100644
--- a/gst/playback/gstqueue2.c
+++ b/gst/playback/gstqueue2.c
@@ -1,9 +1,8 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
- * 2000 Wim Taymans <wtay@chello.be>
* 2003 Colin Walters <cwalters@gnome.org>
- * 2005 Wim Taymans <wim@fluendo.com>
- * 2007 Wim Taymans <wim@fluendo.com>
+ * 2000,2005,2007 Wim Taymans <wim@fluendo.com>
+ * 2007 Thiago Sousa Santos <thiagossantos at gmail dot com>
*
* gstqueue2.c:
*
@@ -41,12 +40,20 @@
*
* The default queue size limits are 100 buffers, 2MB of data, or
* two seconds worth of data, whichever is reached first.
+ *
+ * If you set temp-location, the element will buffer data on the file
+ * specified by it. By using this, it will buffer the entire
+ * stream data on the file independently of the queue size limits, they
+ * will only be used for buffering statistics.
*
*/
+
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
+#include <glib/gstdio.h>
+
#include <gst/gst.h>
#include <gst/gst-i18n-plugin.h>
@@ -83,6 +90,10 @@ enum
#define DEFAULT_LOW_PERCENT 10
#define DEFAULT_HIGH_PERCENT 99
+/* other defines */
+#define DEFAULT_BUFFER_SIZE 4096
+#define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL)
+
enum
{
PROP_0,
@@ -176,6 +187,13 @@ struct _GstQueue
/* temp location stuff */
gchar *temp_location;
+ FILE *temp_file;
+ guint64 writing_pos;
+ guint64 reading_pos;
+ /* we need this to send the first new segment event of the stream
+ * because we can't save it on the file */
+ gboolean segment_event_received;
+ GstEvent *starting_segment;
};
struct _GstQueueClass
@@ -195,7 +213,9 @@ struct _GstQueueClass
queue->max_level.bytes, \
queue->cur_level.time, \
queue->max_level.time, \
- queue->queue->length)
+ QUEUE_IS_USING_TEMP_FILE(queue) ? \
+ queue->writing_pos - queue->reading_pos : \
+ queue->queue->length)
#define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
g_mutex_lock (q->qlock); \
@@ -371,7 +391,7 @@ gst_queue_class_init (GstQueueClass * klass)
g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
g_param_spec_string ("temp-location", "Temporary File Location",
- "Location of a temporary file to store data in (unused)",
+ "Location of a temporary file to store data in",
NULL, G_PARAM_READWRITE));
gst_element_class_add_pad_template (gstelement_class,
@@ -443,6 +463,10 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
queue->item_del = g_cond_new ();
queue->queue = g_queue_new ();
+ /* tempfile related */
+ queue->temp_location = NULL;
+ queue->temp_file = NULL;
+
GST_DEBUG_OBJECT (queue,
"initialized queue's not_empty & not_full conditions");
}
@@ -460,12 +484,17 @@ gst_queue_finalize (GObject * object)
gst_mini_object_unref (data);
}
+
g_queue_free (queue->queue);
g_mutex_free (queue->qlock);
g_cond_free (queue->item_add);
g_cond_free (queue->item_del);
g_timer_destroy (queue->timer);
+ /* temp_file path cleanup */
+ if (queue->temp_location != NULL)
+ g_free (queue->temp_location);
+
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@@ -720,19 +749,208 @@ update_rates (GstQueue * queue)
}
static void
+gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer)
+{
+ guint size;
+ guint8 *data;
+
+ fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
+
+ data = GST_BUFFER_DATA (buffer);
+ size = GST_BUFFER_SIZE (buffer);
+
+ fwrite (data, 1, size, queue->temp_file);
+ queue->writing_pos += size;
+}
+
+/* see if there is enough data in the file to read a full buffer */
+static gboolean
+gst_queue_have_data (GstQueue * queue, guint64 offset, guint length)
+{
+ GST_DEBUG_OBJECT (queue,
+ "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset,
+ length, queue->writing_pos);
+ if (queue->is_eos)
+ return TRUE;
+
+ if (offset + length < queue->writing_pos)
+ return TRUE;
+
+ return FALSE;
+}
+
+static GstFlowReturn
+gst_queue_create_read (GstQueue * queue, guint64 offset, guint length,
+ GstBuffer ** buffer)
+{
+ size_t res;
+ GstBuffer *buf;
+ off_t sres;
+
+ /* check if we have enough data at @offset. If there is not enough data, we
+ * block and wait. */
+ while (!gst_queue_have_data (queue, offset, length)) {
+ GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
+ }
+
+ sres = fseeko (queue->temp_file, offset, SEEK_SET);
+ if (G_UNLIKELY (sres < 0))
+ goto seek_failed;
+
+ buf = gst_buffer_new_and_alloc (length);
+
+ /* this should not block */
+ GST_LOG_OBJECT (queue, "Reading %d bytes", length);
+ res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file);
+ GST_LOG_OBJECT (queue, "read %d bytes", res);
+
+ if (G_UNLIKELY (res == 0)) {
+ /* check for errors or EOF */
+ if (ferror (queue->temp_file))
+ goto could_not_read;
+ if (feof (queue->temp_file) && length > 0)
+ goto eos;
+ }
+
+ length = res;
+
+ GST_BUFFER_SIZE (buf) = length;
+ GST_BUFFER_OFFSET (buf) = offset;
+ GST_BUFFER_OFFSET_END (buf) = offset + length;
+
+ *buffer = buf;
+
+ queue->reading_pos = offset + length;
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+out_flushing:
+ {
+ GST_DEBUG_OBJECT (queue, "we are flushing");
+ return GST_FLOW_WRONG_STATE;
+ }
+seek_failed:
+ {
+ GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ return GST_FLOW_ERROR;
+ }
+could_not_read:
+ {
+ GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
+eos:
+ {
+ GST_DEBUG ("non-regular file hits EOS");
+ gst_buffer_unref (buf);
+ return GST_FLOW_UNEXPECTED;
+ }
+}
+
+static GstMiniObject *
+gst_queue_read_item_from_file (GstQueue * queue)
+{
+ GstMiniObject *item;
+
+ if (queue->starting_segment != NULL) {
+ item = GST_MINI_OBJECT_CAST (queue->starting_segment);
+ queue->starting_segment = NULL;
+ } else {
+ GstFlowReturn ret;
+ GstBuffer *buffer;
+
+ ret =
+ gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE,
+ &buffer);
+ switch (ret) {
+ case GST_FLOW_OK:
+ item = GST_MINI_OBJECT_CAST (buffer);
+ break;
+ case GST_FLOW_UNEXPECTED:
+ item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
+ break;
+ default:
+ item = NULL;
+ break;
+ }
+ }
+ return item;
+}
+
+static gboolean
+gst_queue_open_temp_location_file (GstQueue * queue)
+{
+ /* nothing to do */
+ if (queue->temp_location == NULL)
+ goto no_filename;
+
+ /* open the file for update/writing */
+ queue->temp_file = g_fopen (queue->temp_location, "wb+");
+ /* error creating file */
+ if (queue->temp_file == NULL)
+ goto open_failed;
+
+ queue->writing_pos = 0;
+ queue->reading_pos = 0;
+
+ return TRUE;
+
+ /* ERRORS */
+no_filename:
+ {
+ GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
+ (_("No file name specified.")), (NULL));
+ return FALSE;
+ }
+open_failed:
+ {
+ GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
+ (_("Could not open file \"%s\" for reading."), queue->temp_location),
+ GST_ERROR_SYSTEM);
+ return FALSE;
+ }
+}
+
+static void
+gst_queue_close_temp_location_file (GstQueue * queue)
+{
+ /* nothing to do */
+ if (queue->temp_file == NULL)
+ return;
+
+ /* we don't remove the file so that the application can use it as a cache
+ * later on */
+ fflush (queue->temp_file);
+ fclose (queue->temp_file);
+ remove (queue->temp_location);
+ queue->temp_file = NULL;
+}
+
+static void
gst_queue_locked_flush (GstQueue * queue)
{
- while (!g_queue_is_empty (queue->queue)) {
- GstMiniObject *data = g_queue_pop_head (queue->queue);
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ gst_queue_close_temp_location_file (queue);
+ gst_queue_open_temp_location_file (queue);
+ } else {
+ while (!g_queue_is_empty (queue->queue)) {
+ GstMiniObject *data = g_queue_pop_head (queue->queue);
- /* Then lose another reference because we are supposed to destroy that
- data when flushing */
- gst_mini_object_unref (data);
+ /* Then lose another reference because we are supposed to destroy that
+ data when flushing */
+ gst_mini_object_unref (data);
+ }
}
GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
queue->is_eos = FALSE;
+ if (queue->starting_segment != NULL)
+ gst_event_unref (queue->starting_segment);
+ queue->starting_segment = NULL;
+ queue->segment_event_received = FALSE;
/* we deleted a lot of something */
GST_QUEUE_SIGNAL_DEL (queue);
@@ -760,6 +978,10 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
/* update the buffering status */
update_buffering (queue);
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ gst_queue_write_buffer_to_file (queue, buffer);
+ }
+
} else if (GST_IS_EVENT (item)) {
GstEvent *event;
@@ -773,8 +995,19 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
break;
case GST_EVENT_NEWSEGMENT:
apply_segment (queue, event, &queue->sink_segment);
+ /* This is our first new segment, we hold it
+ * as we can't save it on the temp file */
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (queue->segment_event_received)
+ goto unexpected_event;
+
+ queue->segment_event_received = TRUE;
+ queue->starting_segment = event;
+ }
break;
default:
+ if (QUEUE_IS_USING_TEMP_FILE (queue))
+ goto unexpected_event;
break;
}
} else {
@@ -784,9 +1017,22 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
item = NULL;
}
- if (item)
+ if (!QUEUE_IS_USING_TEMP_FILE (queue) && item)
g_queue_push_tail (queue->queue, item);
GST_QUEUE_SIGNAL_ADD (queue);
+
+ return;
+
+ /* ERRORS */
+unexpected_event:
+ {
+ g_warning
+ ("Unexpected event of kind %s can't be added in temp file of queue %s ",
+ gst_event_type_get_name (GST_EVENT_TYPE (item)),
+ GST_OBJECT_NAME (queue));
+ gst_event_unref (GST_EVENT_CAST (item));
+ return;
+ }
}
/* dequeue an item from the queue and update level stats */
@@ -795,7 +1041,11 @@ gst_queue_locked_dequeue (GstQueue * queue)
{
GstMiniObject *item;
- item = g_queue_pop_head (queue->queue);
+ if (QUEUE_IS_USING_TEMP_FILE (queue))
+ item = gst_queue_read_item_from_file (queue);
+ else
+ item = g_queue_pop_head (queue->queue);
+
if (item == NULL)
goto no_item;
@@ -927,8 +1177,12 @@ gst_queue_is_empty (GstQueue * queue)
if (queue->is_eos)
return FALSE;
- if (queue->queue->length == 0)
- return TRUE;
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ return queue->writing_pos == queue->reading_pos;
+ } else {
+ if (queue->queue->length == 0)
+ return TRUE;
+ }
return FALSE;
}
@@ -942,6 +1196,10 @@ gst_queue_is_filled (GstQueue * queue)
if (queue->is_eos)
return TRUE;
+ /* if using file, we're never filled if we don't have EOS */
+ if (QUEUE_IS_USING_TEMP_FILE (queue))
+ return FALSE;
+
#define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
(queue->cur_level.format) >= (queue->max_level.format))
@@ -1221,6 +1479,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
+ if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!gst_queue_open_temp_location_file (queue))
+ ret = GST_STATE_CHANGE_FAILURE;
+ }
+ queue->segment_event_received = FALSE;
+ queue->starting_segment = NULL;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
@@ -1234,6 +1498,12 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ if (QUEUE_IS_USING_TEMP_FILE (queue))
+ gst_queue_close_temp_location_file (queue);
+ if (queue->starting_segment != NULL) {
+ gst_event_unref (queue->starting_segment);
+ queue->starting_segment = NULL;
+ }
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
@@ -1257,6 +1527,35 @@ gst_queue_change_state (GstElement * element, GstStateChange transition)
#define QUEUE_THRESHOLD_CHANGE(q)\
g_cond_signal (queue->item_add);
+static gboolean
+gst_queue_set_temp_location (GstQueue * queue, const gchar * location)
+{
+ GstState state;
+
+ /* the element must be stopped in order to do this */
+ GST_OBJECT_LOCK (queue);
+ state = GST_STATE (queue);
+ if (state != GST_STATE_READY && state != GST_STATE_NULL)
+ goto wrong_state;
+ GST_OBJECT_UNLOCK (queue);
+
+ /* set new location */
+ g_free (queue->temp_location);
+ queue->temp_location = g_strdup (location);
+
+ g_object_notify (G_OBJECT (queue), "temp-location");
+
+ return TRUE;
+
+/* ERROR */
+wrong_state:
+ {
+ GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state");
+ GST_OBJECT_UNLOCK (queue);
+ return FALSE;
+ }
+}
+
static void
gst_queue_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
@@ -1296,7 +1595,7 @@ gst_queue_set_property (GObject * object,
queue->high_percent = g_value_get_int (value);
break;
case PROP_TEMP_LOCATION:
- queue->temp_location = g_value_dup_string (value);
+ gst_queue_set_temp_location (queue, g_value_dup_string (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);