summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2010-06-15 16:12:02 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2010-08-13 16:38:53 +0200
commit73e27fb0173a9d5464597f0469c50f2a3294652d (patch)
treeba40486e7c42bca453f314ec7a932c1bc243f6c7 /plugins
parent6339bd0bec967081a0b6c1b2f85fa8794043e0b5 (diff)
queue2; cleanups and fixes
Make a macro for some frequent checks Emit the removed signal in all cases when we remove something
Diffstat (limited to 'plugins')
-rw-r--r--plugins/elements/gstqueue2.c103
1 files changed, 47 insertions, 56 deletions
diff --git a/plugins/elements/gstqueue2.c b/plugins/elements/gstqueue2.c
index 07bd69d4bc..c79c32763e 100644
--- a/plugins/elements/gstqueue2.c
+++ b/plugins/elements/gstqueue2.c
@@ -97,6 +97,7 @@ enum
#define DEFAULT_BUFFER_SIZE 4096
#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->use_ring_buffer) /* for consistency with the above macro */
+#define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
/* default property values */
#define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
@@ -148,7 +149,7 @@ enum
queue->max_level.bytes, \
queue->cur_level.time, \
queue->max_level.time, \
- (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
+ (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
queue->current->writing_pos - queue->current->max_reading_pos : \
queue->queue->length))
@@ -792,7 +793,7 @@ update_buffering (GstQueue2 * queue)
queue->buffering_percent = percent;
- if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
GstFormat fmt = GST_FORMAT_BYTES;
gint64 duration;
@@ -1105,11 +1106,12 @@ gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
/* we don't have the range, see how far away we are, FIXME, find a good
* threshold based on the incomming rate. */
if (!queue->is_eos && queue->current) {
- if (QUEUE_IS_USING_RING_BUFFER (queue) && (offset < queue->current->offset
- || offset >
- queue->current->writing_pos + queue->max_level.bytes -
- queue->cur_level.bytes)) {
- perform_seek_to_offset (queue, offset);
+ if (QUEUE_IS_USING_RING_BUFFER (queue)) {
+ if (offset < queue->current->offset || offset >
+ queue->current->writing_pos + queue->max_level.bytes -
+ queue->cur_level.bytes) {
+ perform_seek_to_offset (queue, offset);
+ }
} else if (offset < queue->current->writing_pos + 200000) {
update_cur_pos (queue, queue->current, offset + length);
GST_INFO_OBJECT (queue, "wait for data");
@@ -1174,6 +1176,7 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
guint8 *data;
guint64 file_offset;
guint block_length, remaining, read_length;
+ gint64 read_return;
/* allocate the output buffer of the requested size */
buf = gst_buffer_new_and_alloc (length);
@@ -1221,21 +1224,15 @@ gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
/* while we still have data to read, we loop */
while (read_length > 0) {
- gint64 read_return;
-
read_return =
gst_queue2_read_data_at_offset (queue, file_offset, block_length,
data);
- if (read_return < 0) {
- gst_buffer_unref (buf);
- return read_return;
- }
+ if (read_return < 0)
+ goto read_error;
- if (QUEUE_IS_USING_RING_BUFFER (queue)) {
- file_offset = (file_offset + read_return) % queue->ring_buffer_max_size;
- } else {
- file_offset += read_return;
- }
+ file_offset += read_return;
+ if (QUEUE_IS_USING_RING_BUFFER (queue))
+ file_offset %= queue->ring_buffer_max_size;
data += read_return;
read_length -= read_return;
@@ -1261,6 +1258,12 @@ out_flushing:
GST_DEBUG_OBJECT (queue, "we are flushing");
return GST_FLOW_WRONG_STATE;
}
+read_error:
+ {
+ GST_DEBUG_OBJECT (queue, "we have a read error");
+ gst_buffer_unref (buf);
+ return read_return;
+ }
}
/* should be called with QUEUE_LOCK */
@@ -1282,6 +1285,7 @@ gst_queue2_read_item_from_file (GstQueue2 * queue)
ret =
gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
&buffer);
+
switch (ret) {
case GST_FLOW_OK:
item = GST_MINI_OBJECT_CAST (buffer);
@@ -1419,7 +1423,7 @@ gst_queue2_flush_temp_file (GstQueue2 * queue)
static void
gst_queue2_locked_flush (GstQueue2 * queue)
{
- if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
gst_queue2_flush_temp_file (queue);
} else {
while (!g_queue_is_empty (queue->queue)) {
@@ -1493,15 +1497,16 @@ gst_queue2_write_buffer_to_ring_buffer (GstQueue2 * queue, GstBuffer * buffer)
rem = buffer;
- rb_space =
- queue->ring_buffer_max_size - (queue->current->writing_pos -
- queue->current->reading_pos);
- while (rb_space <= 0) {
- GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
+ do {
rb_space =
queue->ring_buffer_max_size - (queue->current->writing_pos -
queue->current->reading_pos);
- }
+
+ if (rb_space > 0)
+ break;
+
+ GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
+ } while (TRUE);
/* loop if we can't write the whole buffer at once */
do {
@@ -1718,8 +1723,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
size = GST_BUFFER_SIZE (buffer);
/* add buffer to the statistics */
- if (!(QUEUE_IS_USING_TEMP_FILE (queue)
- || QUEUE_IS_USING_RING_BUFFER (queue))) {
+ if (QUEUE_IS_USING_QUEUE (queue)) {
queue->cur_level.buffers++;
queue->cur_level.bytes += size;
}
@@ -1753,8 +1757,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
apply_segment (queue, event, &queue->sink_segment);
/* This is our first new segment, we hold it
* as we can't save it on the temp file */
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- || QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
if (queue->segment_event_received)
goto unexpected_event;
@@ -1769,8 +1772,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
queue->unexpected = FALSE;
break;
default:
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- || QUEUE_IS_USING_TEMP_FILE (queue))
+ if (!QUEUE_IS_USING_QUEUE (queue))
goto unexpected_event;
break;
}
@@ -1785,8 +1787,7 @@ gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item)
/* update the buffering status */
update_buffering (queue);
- if (!(QUEUE_IS_USING_TEMP_FILE (queue)
- || QUEUE_IS_USING_RING_BUFFER (queue)))
+ if (QUEUE_IS_USING_QUEUE (queue))
g_queue_push_tail (queue->queue, item);
else
gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
@@ -1815,7 +1816,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
{
GstMiniObject *item;
- if (QUEUE_IS_USING_TEMP_FILE (queue) || QUEUE_IS_USING_RING_BUFFER (queue))
+ if (!QUEUE_IS_USING_QUEUE (queue))
item = gst_queue2_read_item_from_file (queue);
else
item = g_queue_pop_head (queue->queue);
@@ -1833,8 +1834,7 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"retrieved buffer %p from queue", buffer);
- if (!(QUEUE_IS_USING_TEMP_FILE (queue)
- || QUEUE_IS_USING_RING_BUFFER (queue))) {
+ if (QUEUE_IS_USING_QUEUE (queue)) {
queue->cur_level.buffers--;
queue->cur_level.bytes -= size;
}
@@ -1863,14 +1863,13 @@ gst_queue2_locked_dequeue (GstQueue2 * queue)
default:
break;
}
- GST_QUEUE2_SIGNAL_DEL (queue);
} else {
g_warning
("Unexpected item %p dequeued from queue %s (refcounting problem?)",
item, GST_OBJECT_NAME (queue));
item = NULL;
- GST_QUEUE2_SIGNAL_DEL (queue);
}
+ GST_QUEUE2_SIGNAL_DEL (queue);
return item;
@@ -1893,8 +1892,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
case GST_EVENT_FLUSH_START:
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event");
- if (!QUEUE_IS_USING_RING_BUFFER (queue)
- || !QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (QUEUE_IS_USING_QUEUE (queue)) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
@@ -1918,8 +1916,7 @@ gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event)
{
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event");
- if (!QUEUE_IS_USING_RING_BUFFER (queue)
- || !QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (QUEUE_IS_USING_QUEUE (queue)) {
/* forward event */
gst_pad_push_event (queue->srcpad, event);
@@ -1985,8 +1982,7 @@ gst_queue2_is_empty (GstQueue2 * queue)
if (queue->is_eos)
return FALSE;
- if ((QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue))
- && queue->current) {
+ if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
return queue->current->writing_pos <= queue->current->max_reading_pos;
} else {
if (queue->queue->length == 0)
@@ -2282,8 +2278,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- || !QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (QUEUE_IS_USING_QUEUE (queue)) {
/* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event);
} else {
@@ -2300,8 +2295,7 @@ gst_queue2_handle_src_event (GstPad * pad, GstEvent * event)
}
break;
case GST_EVENT_FLUSH_STOP:
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- || !QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (QUEUE_IS_USING_QUEUE (queue)) {
/* just forward upstream */
res = gst_pad_push_event (queue->sinkpad, event);
} else {
@@ -2391,8 +2385,7 @@ gst_queue2_handle_src_query (GstPad * pad, GstQuery * query)
GST_DEBUG_OBJECT (queue, "query buffering");
/* FIXME - is this condition correct? what should ring buffer do? */
- if (!(QUEUE_IS_USING_RING_BUFFER (queue)
- || QUEUE_IS_USING_TEMP_FILE (queue))) {
+ if (QUEUE_IS_USING_QUEUE (queue)) {
/* no temp file, just forward to the peer */
if (!gst_queue2_peer_query (queue, queue->sinkpad, query))
goto peer_failed;
@@ -2546,7 +2539,7 @@ gst_queue2_src_checkgetrange_function (GstPad * pad)
queue = GST_QUEUE2 (gst_pad_get_parent (pad));
/* we can operate in pull mode when we are using a tempfile */
- ret = QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue);
+ ret = !QUEUE_IS_USING_QUEUE (queue);
gst_object_unref (GST_OBJECT (queue));
@@ -2634,7 +2627,7 @@ gst_queue2_src_activate_pull (GstPad * pad, gboolean active)
queue = GST_QUEUE2 (gst_pad_get_parent (pad));
if (active) {
- if (QUEUE_IS_USING_RING_BUFFER (queue) || QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
/* open the temp file now */
result = gst_queue2_open_temp_location_file (queue);
@@ -2682,8 +2675,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- || QUEUE_IS_USING_TEMP_FILE (queue)) {
+ if (!QUEUE_IS_USING_QUEUE (queue)) {
if (!gst_queue2_open_temp_location_file (queue))
ret = GST_STATE_CHANGE_FAILURE;
}
@@ -2708,8 +2700,7 @@ gst_queue2_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
- if (QUEUE_IS_USING_RING_BUFFER (queue)
- || QUEUE_IS_USING_TEMP_FILE (queue))
+ if (!QUEUE_IS_USING_QUEUE (queue))
gst_queue2_close_temp_location_file (queue);
if (queue->starting_segment != NULL) {
gst_event_unref (queue->starting_segment);