summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErik Walthinsen <omega@temple-baptist.org>2001-05-25 00:29:25 +0000
committerErik Walthinsen <omega@temple-baptist.org>2001-05-25 00:29:25 +0000
commita7cd58c92eb9e6a7f9f8e0de8c3b94960f952ad6 (patch)
tree35217bb8323af3f0893557dd94b44fbcf427839f
parenta74b02deb87f4bbeede70b91bacd14045e4d8bad (diff)
fixed some interruptability problems with thread and queue
Original commit message from CVS: fixed some interruptability problems with thread and queue
-rw-r--r--gst/gstpad.c8
-rw-r--r--gst/gstqueue.c18
-rw-r--r--gst/gstqueue.h1
-rw-r--r--gst/gstscheduler.h2
-rw-r--r--gst/gstthread.c34
-rw-r--r--plugins/elements/gstqueue.c18
-rw-r--r--plugins/elements/gstqueue.h1
7 files changed, 67 insertions, 15 deletions
diff --git a/gst/gstpad.c b/gst/gstpad.c
index 47441d91aa..d073693f52 100644
--- a/gst/gstpad.c
+++ b/gst/gstpad.c
@@ -1438,9 +1438,9 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
{
GstRealPad *peer = GST_RPAD_PEER (pad);
- g_return_if_fail (peer != NULL);
-
GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
+
+ g_return_if_fail (peer != NULL);
if (peer->pushfunc) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling pushfunc &%s of peer pad %s:%s\n",
@@ -1465,10 +1465,10 @@ gst_pad_pull (GstPad *pad)
{
GstRealPad *peer = GST_RPAD_PEER(pad);
- g_return_val_if_fail (peer != NULL, NULL);
-
GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
+ g_return_val_if_fail (peer != NULL, NULL);
+
if (peer->pullfunc) {
GST_DEBUG (GST_CAT_DATAFLOW,"calling pullfunc %s of peer pad %s:%s\n",
GST_DEBUG_FUNCPTR_NAME(peer->pullfunc),GST_DEBUG_PAD_NAME(peer));
diff --git a/gst/gstqueue.c b/gst/gstqueue.c
index 7c255b97cd..896d240845 100644
--- a/gst/gstqueue.c
+++ b/gst/gstqueue.c
@@ -323,8 +323,15 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
- GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
+GST_STATE_NONE_PENDING)
{
+ GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
+ if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue);
cothread_switch(cothread_current_main());
}
@@ -386,8 +393,15 @@ gst_queue_get (GstPad *pad)
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
- GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
+GST_STATE_NONE_PENDING)
{
+ GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
+ if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue);
cothread_switch(cothread_current_main());
}
diff --git a/gst/gstqueue.h b/gst/gstqueue.h
index 5f695f6667..085d5ac122 100644
--- a/gst/gstqueue.h
+++ b/gst/gstqueue.h
@@ -75,6 +75,7 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */
+// GMutex *lock; (optimization?)
GCond *emptycond;
GCond *fullcond;
diff --git a/gst/gstscheduler.h b/gst/gstscheduler.h
index d980c21c0f..cdc9052219 100644
--- a/gst/gstscheduler.h
+++ b/gst/gstscheduler.h
@@ -45,6 +45,8 @@ extern "C" {
(GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_SCHEDULE))
+#define GST_SCHED_PARENT(sched) ((sched)->parent)
+
//typedef struct _GstSchedule GstSchedule;
//typedef struct _GstScheduleClass GstScheduleClass;
typedef struct _GstScheduleChain GstScheduleChain;
diff --git a/gst/gstthread.c b/gst/gstthread.c
index aec3f4bb44..87c20d699a 100644
--- a/gst/gstthread.c
+++ b/gst/gstthread.c
@@ -250,6 +250,7 @@ gst_thread_change_state (GstElement *element)
gboolean stateset = GST_STATE_SUCCESS;
gint transition;
pthread_t self = pthread_self();
+ GstElement *peerelement;
g_return_val_if_fail (GST_IS_THREAD(element), FALSE);
// GST_DEBUG_ENTER("(\"%s\")",GST_ELEMENT_NAME(element));
@@ -328,10 +329,10 @@ gst_thread_change_state (GstElement *element)
if (pthread_equal(self, thread->thread_id))
{
//FIXME this should not happen
- g_assert(!pthread_equal(self, thread->thread_id));
GST_DEBUG(GST_CAT_THREAD,"no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to paused\n",
GST_DEBUG_THREAD_ARGS(thread->pid));
GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
+ g_assert(!pthread_equal(self, thread->thread_id));
}
else
{
@@ -357,8 +358,10 @@ gst_thread_change_state (GstElement *element)
//
//FIXME also make this more efficient by keeping list of managed queues
THR_DEBUG("waking queue \"%s\"\n",GST_ELEMENT_NAME(e));
+ GST_LOCK(e);
g_cond_signal((GST_QUEUE(e)->emptycond));
g_cond_signal((GST_QUEUE(e)->fullcond));
+ GST_UNLOCK(e);
}
else
{
@@ -367,16 +370,32 @@ gst_thread_change_state (GstElement *element)
{
GstPad *p = GST_PAD(pads->data);
pads = g_list_next(pads);
- if (GST_IS_REAL_PAD(p) &&
- GST_ELEMENT_SCHED(e) != GST_ELEMENT_SCHED(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p)))))
+
+ peerelement = GST_PAD_PARENT(GST_PAD_PEER(p));
+ if (!peerelement) continue; // deal with case where there's no peer
+
+ if (!GST_FLAG_IS_SET(peerelement,GST_ELEMENT_DECOUPLED)) {
+ GST_DEBUG(GST_CAT_THREAD,"peer element isn't DECOUPLED\n");
+ continue;
+ }
+
+ // FIXME this needs to go away eventually
+ if (!GST_IS_QUEUE(peerelement)) {
+ GST_DEBUG(GST_CAT_THREAD,"peer element isn't a Queue\n");
+ continue;
+ }
+
+ if (GST_ELEMENT_SCHED(peerelement) != GST_ELEMENT_SCHED(thread))
+// GST_ELEMENT_SCHED(e) != GST_ELEMENT_SCHED(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p)))))
{
THR_DEBUG(" element \"%s\" has pad cross sched boundary\n",GST_ELEMENT_NAME(e));
// FIXME i assume this signals our own (current) thread so don't need to lock
// FIXME however, this *may* go to yet another thread for which we need locks
// FIXME i'm too tired to deal with this now
- g_cond_signal(GST_QUEUE(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))->emptycond);
- g_cond_signal(GST_QUEUE(GST_ELEMENT(GST_PAD_PARENT(GST_PAD_PEER(p))))->fullcond);
-
+ GST_LOCK(peerelement);
+ g_cond_signal(GST_QUEUE(peerelement)->emptycond);
+ g_cond_signal(GST_QUEUE(peerelement)->fullcond);
+ GST_UNLOCK(peerelement);
}
}
}
@@ -396,6 +415,7 @@ gst_thread_change_state (GstElement *element)
}
else
{
+ // FIXME FIXME we need to interrupt, or reorder the states!
THR_DEBUG("telling thread to pause (ready)\n");
g_mutex_lock(thread->lock);
gst_thread_signal_thread(thread,FALSE);
@@ -486,7 +506,7 @@ gst_thread_main_loop (void *arg)
THR_DEBUG_MAIN("parent thread has signaled back at top of while\n");
// now is a good time to change the state of the children and the thread itself
gst_thread_update_state (thread);
- THR_DEBUG_MAIN("doe changing state, signaling back to parent process\n");
+ THR_DEBUG_MAIN("done changing state, signaling back to parent process\n");
g_cond_signal (thread->cond);
g_mutex_unlock (thread->lock);
THR_DEBUG_MAIN("done syncing with parent process at top of while\n");
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index 7c255b97cd..896d240845 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -323,8 +323,15 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
- GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->sinkpad))))) !=
+GST_STATE_NONE_PENDING)
{
+ GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
+ if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue);
cothread_switch(cothread_current_main());
}
@@ -386,8 +393,15 @@ gst_queue_get (GstPad *pad)
// if there's a pending state change for this queue or its manager, switch
// back to iterator so bottom half of state change executes
if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING ||
- GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+// GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+GST_STATE_PENDING(GST_SCHED_PARENT(GST_ELEMENT_SCHED(GST_PAD_PARENT(GST_PAD_PEER(queue->srcpad))))) !=
+GST_STATE_NONE_PENDING)
{
+ GST_DEBUG(GST_CAT_DATAFLOW,"interrupted!!\n");
+ if (GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(queue) != GST_STATE_NONE_PENDING)\n");
+ if (GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING)
+ GST_DEBUG(GST_CAT_DATAFLOW,"GST_STATE_PENDING(GST_SCHEDULE(GST_ELEMENT(queue)->sched)->parent) != GST_STATE_NONE_PENDING\n");
GST_UNLOCK(queue);
cothread_switch(cothread_current_main());
}
diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h
index 5f695f6667..085d5ac122 100644
--- a/plugins/elements/gstqueue.h
+++ b/plugins/elements/gstqueue.h
@@ -75,6 +75,7 @@ struct _GstQueue {
gint leaky; /* whether the queue is leaky, and if so at which end */
+// GMutex *lock; (optimization?)
GCond *emptycond;
GCond *fullcond;