summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--binaryurp/source/bridge.cxx205
-rw-r--r--binaryurp/source/bridge.hxx25
-rw-r--r--binaryurp/source/bridgefactory.cxx2
-rw-r--r--binaryurp/source/incomingrequest.cxx2
-rw-r--r--binaryurp/source/reader.cxx2
-rw-r--r--binaryurp/source/writer.cxx2
-rw-r--r--cppu/inc/uno/threadpool.h8
-rw-r--r--cppu/source/threadpool/thread.cxx184
-rw-r--r--cppu/source/threadpool/thread.hxx52
-rw-r--r--cppu/source/threadpool/threadpool.cxx124
-rw-r--r--cppu/source/threadpool/threadpool.hxx43
-rw-r--r--cppu/util/makefile.mk2
-rw-r--r--cpputools/source/unoexe/unoexe.cxx4
13 files changed, 372 insertions, 283 deletions
diff --git a/binaryurp/source/bridge.cxx b/binaryurp/source/bridge.cxx
index f591fe0e9e43..13987fe3bd51 100644
--- a/binaryurp/source/bridge.cxx
+++ b/binaryurp/source/bridge.cxx
@@ -105,11 +105,9 @@ extern "C" void SAL_CALL freeProxyCallback(uno_ExtEnvironment *, void * pProxy)
static_cast< Proxy * >(pProxy)->do_free();
}
-void joinThread(osl::Thread * thread) {
+bool isThread(osl::Thread * thread) {
assert(thread != 0);
- if (thread->getIdentifier() != osl::Thread::getCurrentIdentifier()) {
- thread->join();
- }
+ return osl::Thread::getCurrentIdentifier() == thread->getIdentifier();
}
class AttachThread: private boost::noncopyable {
@@ -213,8 +211,8 @@ Bridge::Bridge(
rtl::OUString(
RTL_CONSTASCII_USTRINGPARAM(
"com.sun.star.bridge.XProtocolProperties::commitChange"))),
- threadPool_(0), currentContextMode_(false), proxies_(0), calls_(0),
- normalCall_(false), activeCalls_(0), terminated_(false),
+ state_(STATE_INITIAL), threadPool_(0), currentContextMode_(false),
+ proxies_(0), calls_(0), normalCall_(false), activeCalls_(0),
mode_(MODE_REQUESTED)
{
assert(factory.is() && connection.is());
@@ -238,11 +236,14 @@ void Bridge::start() {
rtl::Reference< Writer > w(new Writer(this));
{
osl::MutexGuard g(mutex_);
- assert(threadPool_ == 0 && !writer_.is() && !reader_.is());
+ assert(
+ state_ == STATE_INITIAL && threadPool_ == 0 && !writer_.is() &&
+ !reader_.is());
threadPool_ = uno_threadpool_create();
assert(threadPool_ != 0);
reader_ = r;
writer_ = w;
+ state_ = STATE_STARTED;
}
// It is important to call reader_->create() last here; both
// Writer::execute and Reader::execute can call Bridge::terminate, but
@@ -254,60 +255,117 @@ void Bridge::start() {
r->create();
}
-void Bridge::terminate() {
+void Bridge::terminate(bool final) {
uno_ThreadPool tp;
- rtl::Reference< Reader > r;
- rtl::Reference< Writer > w;
- Listeners ls;
+ // Make sure function-local variables (Stubs s, etc.) are destroyed before
+ // the final uno_threadpool_destroy/threadPool_ = 0:
{
- osl::MutexGuard g(mutex_);
- if (terminated_) {
- return;
+ rtl::Reference< Reader > r;
+ rtl::Reference< Writer > w;
+ bool joinW;
+ Listeners ls;
+ {
+ osl::ClearableMutexGuard g(mutex_);
+ switch (state_) {
+ case STATE_INITIAL: // via ~Bridge -> dispose -> terminate
+ case STATE_FINAL:
+ return;
+ case STATE_STARTED:
+ break;
+ case STATE_TERMINATED:
+ if (final) {
+ g.clear();
+ terminated_.wait();
+ {
+ osl::MutexGuard g2(mutex_);
+ tp = threadPool_;
+ threadPool_ = 0;
+ assert(!(reader_.is() && isThread(reader_.get())));
+ std::swap(reader_, r);
+ assert(!(writer_.is() && isThread(writer_.get())));
+ std::swap(writer_, w);
+ state_ = STATE_FINAL;
+ }
+ assert(!(r.is() && w.is()));
+ if (r.is()) {
+ r->join();
+ } else if (w.is()) {
+ w->join();
+ }
+ if (tp != 0) {
+ uno_threadpool_destroy(tp);
+ }
+ }
+ return;
+ }
+ tp = threadPool_;
+ assert(!(final && isThread(reader_.get())));
+ if (!isThread(reader_.get())) {
+ std::swap(reader_, r);
+ }
+ w = writer_;
+ joinW = !isThread(writer_.get());
+ assert(!final || joinW);
+ if (joinW) {
+ writer_.clear();
+ }
+ ls.swap(listeners_);
+ state_ = final ? STATE_FINAL : STATE_TERMINATED;
+ }
+ try {
+ connection_->close();
+ } catch (const css::io::IOException & e) {
+ SAL_INFO("binaryurp", "caught IO exception '" << e.Message << '\'');
+ }
+ assert(w.is());
+ w->stop();
+ if (r.is()) {
+ r->join();
+ }
+ if (joinW) {
+ w->join();
+ }
+ assert(tp != 0);
+ uno_threadpool_dispose(tp);
+ Stubs s;
+ {
+ osl::MutexGuard g(mutex_);
+ s.swap(stubs_);
+ }
+ for (Stubs::iterator i(s.begin()); i != s.end(); ++i) {
+ for (Stub::iterator j(i->second.begin()); j != i->second.end(); ++j)
+ {
+ SAL_INFO(
+ "binaryurp",
+ "stub '" << i->first << "', '" << toString(j->first)
+ << "' still mapped at Bridge::terminate");
+ binaryUno_.get()->pExtEnv->revokeInterface(
+ binaryUno_.get()->pExtEnv, j->second.object.get());
+ }
+ }
+ factory_->removeBridge(this);
+ for (Listeners::iterator i(ls.begin()); i != ls.end(); ++i) {
+ try {
+ (*i)->disposing(
+ css::lang::EventObject(
+ static_cast< cppu::OWeakObject * >(this)));
+ } catch (const css::uno::RuntimeException & e) {
+ SAL_WARN(
+ "binaryurp",
+ "caught runtime exception '" << e.Message << '\'');
+ }
}
- tp = threadPool_;
- std::swap(reader_, r);
- std::swap(writer_, w);
- ls.swap(listeners_);
- terminated_ = true;
}
- try {
- connection_->close();
- } catch (const css::io::IOException & e) {
- SAL_INFO("binaryurp", "caught IO exception '" << e.Message << '\'');
- }
- assert(w.is());
- w->stop();
- joinThread(r.get());
- joinThread(w.get());
- assert(tp != 0);
- uno_threadpool_dispose(tp);
- Stubs s;
+ if (final) {
+ uno_threadpool_destroy(tp);
+ }
{
osl::MutexGuard g(mutex_);
- s.swap(stubs_);
- }
- for (Stubs::iterator i(s.begin()); i != s.end(); ++i) {
- for (Stub::iterator j(i->second.begin()); j != i->second.end(); ++j) {
- SAL_INFO(
- "binaryurp",
- "stub '" << i->first << "', '" << toString(j->first)
- << "' still mapped at Bridge::terminate");
- binaryUno_.get()->pExtEnv->revokeInterface(
- binaryUno_.get()->pExtEnv, j->second.object.get());
+ if (final) {
+ threadPool_ = 0;
}
}
- factory_->removeBridge(this);
- for (Listeners::iterator i(ls.begin()); i != ls.end(); ++i) {
- try {
- (*i)->disposing(
- css::lang::EventObject(
- static_cast< cppu::OWeakObject * >(this)));
- } catch (const css::uno::RuntimeException & e) {
- SAL_WARN(
- "binaryurp", "caught runtime exception '" << e.Message << '\'');
- }
- }
- uno_threadpool_destroy(tp);
+ terminated_.set();
}
css::uno::Reference< css::connection::XConnection > Bridge::getConnection()
@@ -339,19 +397,14 @@ BinaryAny Bridge::mapCppToBinaryAny(css::uno::Any const & cppAny) {
uno_ThreadPool Bridge::getThreadPool() {
osl::MutexGuard g(mutex_);
+ checkDisposed();
assert(threadPool_ != 0);
return threadPool_;
}
rtl::Reference< Writer > Bridge::getWriter() {
osl::MutexGuard g(mutex_);
- if (terminated_) {
- throw css::lang::DisposedException(
- rtl::OUString(
- RTL_CONSTASCII_USTRINGPARAM(
- "Binary URP bridge already disposed")),
- static_cast< cppu::OWeakObject * >(this));
- }
+ checkDisposed();
assert(writer_.is());
return writer_;
}
@@ -822,9 +875,15 @@ bool Bridge::isCurrentContextMode() {
}
Bridge::~Bridge() {
- if (getThreadPool() != 0) {
- terminate();
+#if OSL_DEBUG_LEVEL > 0
+ {
+ osl::MutexGuard g(mutex_);
+ SAL_WARN_IF(
+ state_ == STATE_STARTED || state_ == STATE_TERMINATED, "binaryurp",
+ "undisposed bridge, potential deadlock ahead");
}
+#endif
+ dispose();
}
css::uno::Reference< css::uno::XInterface > Bridge::getInstance(
@@ -885,7 +944,11 @@ rtl::OUString Bridge::getDescription() throw (css::uno::RuntimeException) {
}
void Bridge::dispose() throw (css::uno::RuntimeException) {
- terminate();
+ // For terminate(true) not to deadlock, an external protocol must ensure
+ // that dispose is not called from a thread pool worker thread (that dispose
+ // is never called from the reader or writer thread is already ensured
+ // internally):
+ terminate(true);
// OOo expects dispose to not return while there are still remote calls in
// progress; an external protocol must ensure that dispose is not called
// from within an incoming or outgoing remote call, as passive_.wait() would
@@ -900,7 +963,8 @@ void Bridge::addEventListener(
assert(xListener.is());
{
osl::MutexGuard g(mutex_);
- if (!terminated_) {
+ assert(state_ != STATE_INITIAL);
+ if (state_ == STATE_STARTED) {
listeners_.push_back(xListener);
return;
}
@@ -995,7 +1059,18 @@ void Bridge::terminateWhenUnused(bool unused) {
// That the current thread considers the bridge unused implies that it
// is not within an incoming or outgoing remote call (so calling
// terminate cannot lead to deadlock):
- terminate();
+ terminate(false);
+ }
+}
+
+void Bridge::checkDisposed() {
+ assert(state_ != STATE_INITIAL);
+ if (state_ != STATE_STARTED) {
+ throw css::lang::DisposedException(
+ rtl::OUString(
+ RTL_CONSTASCII_USTRINGPARAM(
+ "Binary URP bridge already disposed")),
+ static_cast< cppu::OWeakObject * >(this));
}
}
diff --git a/binaryurp/source/bridge.hxx b/binaryurp/source/bridge.hxx
index 8d667897d253..3ffbfbaeb43b 100644
--- a/binaryurp/source/bridge.hxx
+++ b/binaryurp/source/bridge.hxx
@@ -93,8 +93,11 @@ public:
void start();
// Internally waits for all incoming and outgoing remote calls to terminate,
- // so must not be called from within such a call:
- void terminate();
+ // so must not be called from within such a call; when final is true, also
+ // joins all remaining threads (reader, writer, and worker threads from the
+ // thread pool), so must not be called with final set to true from such a
+ // thread:
+ void terminate(bool final);
com::sun::star::uno::Reference< com::sun::star::connection::XConnection >
getConnection() const;
@@ -228,6 +231,9 @@ private:
void terminateWhenUnused(bool unused);
+ // Must only be called with mutex_ locked:
+ void checkDisposed();
+
typedef
std::list<
com::sun::star::uno::Reference<
@@ -240,6 +246,8 @@ private:
typedef std::map< rtl::OUString, Stub > Stubs;
+ enum State { STATE_INITIAL, STATE_STARTED, STATE_TERMINATED, STATE_FINAL };
+
enum Mode {
MODE_REQUESTED, MODE_REPLY_MINUS1, MODE_REPLY_0, MODE_REPLY_1,
MODE_WAIT, MODE_NORMAL, MODE_NORMAL_WAIT };
@@ -259,8 +267,15 @@ private:
com::sun::star::uno::TypeDescription protPropRequest_;
com::sun::star::uno::TypeDescription protPropCommit_;
OutgoingRequests outgoingRequests_;
+ osl::Condition passive_;
+ // to guarantee that passive_ is eventually set (to avoid deadlock, see
+ // dispose), activeCalls_ only counts those calls for which it can be
+ // guaranteed that incrementActiveCalls is indeed followed by
+ // decrementActiveCalls, without an intervening exception
+ osl::Condition terminated_;
osl::Mutex mutex_;
+ State state_;
Listeners listeners_;
uno_ThreadPool threadPool_;
rtl::Reference< Writer > writer_;
@@ -271,12 +286,6 @@ private:
std::size_t calls_;
bool normalCall_;
std::size_t activeCalls_;
- osl::Condition passive_;
- // to guarantee that passive_ is eventually set (to avoid deadlock, see
- // dispose), activeCalls_ only counts those calls for which it can be
- // guaranteed that incrementActiveCalls is indeed followed by
- // decrementActiveCalls, without an intervening exception
- bool terminated_;
// Only accessed from reader_ thread:
Mode mode_;
diff --git a/binaryurp/source/bridgefactory.cxx b/binaryurp/source/bridgefactory.cxx
index 5c9105585916..8f4caa64e17d 100644
--- a/binaryurp/source/bridgefactory.cxx
+++ b/binaryurp/source/bridgefactory.cxx
@@ -212,7 +212,7 @@ static cppu::ImplementationEntry const services[] = {
{ &binaryurp::BridgeFactory::static_create,
&binaryurp::BridgeFactory::static_getImplementationName,
&binaryurp::BridgeFactory::static_getSupportedServiceNames,
- &cppu::createSingleComponentFactory, 0, 0 },
+ &cppu::createOneInstanceComponentFactory, 0, 0 },
{ 0, 0, 0, 0, 0, 0 }
};
diff --git a/binaryurp/source/incomingrequest.cxx b/binaryurp/source/incomingrequest.cxx
index 431c88505ad1..83b0030623e7 100644
--- a/binaryurp/source/incomingrequest.cxx
+++ b/binaryurp/source/incomingrequest.cxx
@@ -123,7 +123,7 @@ void IncomingRequest::execute() const {
} catch (const std::exception & e) {
OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what());
}
- bridge_->terminate();
+ bridge_->terminate(false);
} else {
if (isExc) {
OSL_TRACE(OSL_LOG_PREFIX "oneway method raised exception");
diff --git a/binaryurp/source/reader.cxx b/binaryurp/source/reader.cxx
index c052fadc5629..47bdfbf065fc 100644
--- a/binaryurp/source/reader.cxx
+++ b/binaryurp/source/reader.cxx
@@ -150,7 +150,7 @@ void Reader::run() {
} catch (const std::exception & e) {
SAL_WARN("binaryurp", "caught C++ exception '" << e.what() << '\'');
}
- bridge_->terminate();
+ bridge_->terminate(false);
}
void Reader::onTerminated() {
diff --git a/binaryurp/source/writer.cxx b/binaryurp/source/writer.cxx
index 5aca57b108eb..69825e13bc37 100644
--- a/binaryurp/source/writer.cxx
+++ b/binaryurp/source/writer.cxx
@@ -196,7 +196,7 @@ void Writer::run() {
} catch (const std::exception & e) {
OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what());
}
- bridge_->terminate();
+ bridge_->terminate(false);
}
void Writer::onTerminated() {
diff --git a/cppu/inc/uno/threadpool.h b/cppu/inc/uno/threadpool.h
index 0c647442e377..77bb712e53b4 100644
--- a/cppu/inc/uno/threadpool.h
+++ b/cppu/inc/uno/threadpool.h
@@ -166,10 +166,6 @@ uno_threadpool_putJob(
return immeadiatly with *ppJob == 0.
@param hPool The handle to be disposed.
- In case, hPool is 0, this function joins on all threads created
- by the threadpool administration. This may e.g. used to ensure, that
- no threads are inside the cppu library anymore, in case it needs to get
- unloaded.
This function is called i.e. by a bridge, that is forced to dispose itself.
*/
@@ -180,6 +176,10 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C();
/** Releases the previously with uno_threadpool_create() created handle.
The handle thus becomes invalid. It is an error to use the handle after
uno_threadpool_destroy().
+
+ A call to uno_threadpool_destroy can synchronously join on spawned worker
+ threads, so this function must never be called from such a worker thread.
+
@see uno_threadpool_create()
*/
void SAL_CALL
diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx
index cc22a453c79d..12ea09a131ee 100644
--- a/cppu/source/threadpool/thread.cxx
+++ b/cppu/source/threadpool/thread.cxx
@@ -33,7 +33,6 @@
#include <com/sun/star/lang/DisposedException.hpp>
#include <com/sun/star/uno/Reference.hxx>
#include <com/sun/star/uno/XInterface.hpp>
-#include <rtl/instance.hxx>
#include <rtl/ustring.h>
#include <rtl/ustring.hxx>
@@ -48,17 +47,6 @@ namespace css = com::sun::star;
}
using namespace osl;
-extern "C" {
-
-void SAL_CALL cppu_requestThreadWorker( void *pVoid )
-{
- ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid;
-
- pThread->run();
- pThread->onTerminated();
-}
-
-}
namespace cppu_threadpool {
@@ -75,7 +63,7 @@ namespace cppu_threadpool {
#endif
}
- void ThreadAdmin::add( ORequestThread *p )
+ void ThreadAdmin::add( rtl::Reference< ORequestThread > const & p )
{
MutexGuard aGuard( m_mutex );
if( m_disposed )
@@ -90,12 +78,19 @@ namespace cppu_threadpool {
m_lst.push_back( p );
}
- void ThreadAdmin::remove( ORequestThread * p )
+ void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p )
+ {
+ ::std::list< rtl::Reference< ORequestThread > >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
+ if( ii != m_lst.end() )
+ {
+ m_lst.erase( ii );
+ }
+ }
+
+ void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p )
{
MutexGuard aGuard( m_mutex );
- ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
- OSL_ASSERT( ii != m_lst.end() );
- m_lst.erase( ii );
+ remove_locked( p );
}
void ThreadAdmin::join()
@@ -104,62 +99,34 @@ namespace cppu_threadpool {
MutexGuard aGuard( m_mutex );
m_disposed = true;
}
- ORequestThread *pCurrent;
- do
+ for (;;)
{
- pCurrent = 0;
+ rtl::Reference< ORequestThread > pCurrent;
{
MutexGuard aGuard( m_mutex );
- if( ! m_lst.empty() )
+ if( m_lst.empty() )
{
- pCurrent = m_lst.front();
- pCurrent->setDeleteSelf( sal_False );
+ break;
}
+ pCurrent = m_lst.front();
+ m_lst.pop_front();
}
- if ( pCurrent )
- {
- pCurrent->join();
- delete pCurrent;
- }
- } while( pCurrent );
- }
-
- struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin >
- {
- ThreadAdminHolder operator () () {
- ThreadAdminHolder aRet(new ThreadAdmin());
- return aRet;
+ pCurrent->join();
}
- };
-
- ThreadAdminHolder& ThreadAdmin::getInstance()
- {
- return theThreadAdmin::get();
}
// ----------------------------------------------------------------------------------
- ORequestThread::ORequestThread( JobQueue *pQueue,
+ ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool,
+ JobQueue *pQueue,
const ByteSequence &aThreadId,
sal_Bool bAsynchron )
- : m_thread( 0 )
- , m_aThreadAdmin( ThreadAdmin::getInstance() )
+ : m_aThreadPool( aThreadPool )
, m_pQueue( pQueue )
, m_aThreadId( aThreadId )
, m_bAsynchron( bAsynchron )
- , m_bDeleteSelf( sal_True )
- {
- m_aThreadAdmin->add( this );
- }
-
-
- ORequestThread::~ORequestThread()
- {
- if (m_thread != 0)
- {
- osl_destroyThread(m_thread);
- }
- }
+ {}
+ ORequestThread::~ORequestThread() {}
void ORequestThread::setTask( JobQueue *pQueue,
const ByteSequence &aThreadId,
@@ -170,74 +137,81 @@ namespace cppu_threadpool {
m_bAsynchron = bAsynchron;
}
- sal_Bool ORequestThread::create()
+ void ORequestThread::launch()
{
- OSL_ASSERT(m_thread == 0); // only one running thread per instance
-
- m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this);
- if ( m_thread )
- {
- osl_resumeThread( m_thread );
+ // Assumption is that osl::Thread::create returns normally with a true
+ // return value iff it causes osl::Thread::run to start executing:
+ acquire();
+ ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin();
+ osl::ClearableMutexGuard g(rThreadAdmin.m_mutex);
+ rThreadAdmin.add( this );
+ try {
+ if (!create()) {
+ throw std::runtime_error("osl::Thread::create failed");
+ }
+ } catch (...) {
+ rThreadAdmin.remove_locked( this );
+ g.clear();
+ release();
+ throw;
}
-
- return m_thread != 0;
- }
-
- void ORequestThread::join()
- {
- osl_joinWithThread( m_thread );
}
void ORequestThread::onTerminated()
{
- m_aThreadAdmin->remove( this );
- if( m_bDeleteSelf )
- {
- delete this;
- }
+ m_aThreadPool->getThreadAdmin().remove( this );
+ release();
}
void ORequestThread::run()
{
- ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance();
-
- while ( m_pQueue )
+ try
{
- if( ! m_bAsynchron )
+ while ( m_pQueue )
{
- if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
+ if( ! m_bAsynchron )
{
- OSL_ASSERT( false );
+ if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
+ {
+ OSL_ASSERT( false );
+ }
}
- }
- while( ! m_pQueue->isEmpty() )
- {
- // Note : Oneways should not get a disposable disposeid,
- // It does not make sense to dispose a call in this state.
- // That's way we put it an disposeid, that can't be used otherwise.
- m_pQueue->enter(
- sal::static_int_cast< sal_Int64 >(
- reinterpret_cast< sal_IntPtr >(this)),
- sal_True );
-
- if( m_pQueue->isEmpty() )
+ while( ! m_pQueue->isEmpty() )
{
- theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
- // Note : revokeQueue might have failed because m_pQueue.isEmpty()
- // may be false (race).
+ // Note : Oneways should not get a disposable disposeid,
+ // It does not make sense to dispose a call in this state.
+ // That's way we put it an disposeid, that can't be used otherwise.
+ m_pQueue->enter(
+ sal::static_int_cast< sal_Int64 >(
+ reinterpret_cast< sal_IntPtr >(this)),
+ sal_True );
+
+ if( m_pQueue->isEmpty() )
+ {
+ m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
+ // Note : revokeQueue might have failed because m_pQueue.isEmpty()
+ // may be false (race).
+ }
}
- }
- delete m_pQueue;
- m_pQueue = 0;
+ delete m_pQueue;
+ m_pQueue = 0;
- if( ! m_bAsynchron )
- {
- uno_releaseIdFromCurrentThread();
- }
+ if( ! m_bAsynchron )
+ {
+ uno_releaseIdFromCurrentThread();
+ }
- theThreadPool->waitInPool( this );
+ m_aThreadPool->waitInPool( this );
+ }
+ }
+ catch (...)
+ {
+ // Work around the problem that onTerminated is not called if run
+ // throws an exception:
+ onTerminated();
+ throw;
}
}
}
diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx
index a3ea45aadaed..88f3d91f8722 100644
--- a/cppu/source/threadpool/thread.hxx
+++ b/cppu/source/threadpool/thread.hxx
@@ -28,63 +28,49 @@
#ifndef _CPPU_THREADPOOL_THREAD_HXX
#define _CPPU_THREADPOOL_THREAD_HXX
-#include <list>
+#include <osl/thread.hxx>
#include <sal/types.h>
-
-#include <osl/thread.h>
+#include <salhelper/simplereferenceobject.hxx>
#include "jobqueue.hxx"
+#include "threadpool.hxx"
namespace cppu_threadpool {
class JobQueue;
- class ThreadAdmin;
- typedef boost::shared_ptr<ThreadAdmin> ThreadAdminHolder;
//-----------------------------------------
// private thread class for the threadpool
// independent from vos
//-----------------------------------------
- class ORequestThread
+ class ORequestThread:
+ public salhelper::SimpleReferenceObject, public osl::Thread
{
public:
- ORequestThread( JobQueue * ,
+ ORequestThread( ThreadPoolHolder const &aThreadPool,
+ JobQueue * ,
const ::rtl::ByteSequence &aThreadId,
sal_Bool bAsynchron );
- ~ORequestThread();
+ virtual ~ORequestThread();
void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , sal_Bool bAsynchron );
- sal_Bool create();
- void join();
- void onTerminated();
- void run();
- inline void setDeleteSelf( sal_Bool b )
- { m_bDeleteSelf = b; }
+ void launch();
+
+ static inline void * operator new(std::size_t size)
+ { return SimpleReferenceObject::operator new(size); }
+
+ static inline void operator delete(void * pointer)
+ { SimpleReferenceObject::operator delete(pointer); }
private:
- oslThread m_thread;
- ThreadAdminHolder m_aThreadAdmin;
+ virtual void SAL_CALL run();
+ virtual void SAL_CALL onTerminated();
+
+ ThreadPoolHolder m_aThreadPool;
JobQueue *m_pQueue;
::rtl::ByteSequence m_aThreadId;
sal_Bool m_bAsynchron;
- sal_Bool m_bDeleteSelf;
- };
-
- class ThreadAdmin
- {
- public:
- ThreadAdmin();
- ~ThreadAdmin ();
- static ThreadAdminHolder &getInstance();
- void add( ORequestThread * );
- void remove( ORequestThread * );
- void join();
-
- private:
- ::osl::Mutex m_mutex;
- ::std::list< ORequestThread * > m_lst;
- bool m_disposed;
};
} // end cppu_threadpool
diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx
index f9f0be6c7d03..9099b7e1981e 100644
--- a/cppu/source/threadpool/threadpool.cxx
+++ b/cppu/source/threadpool/threadpool.cxx
@@ -109,15 +109,6 @@ namespace cppu_threadpool
//-------------------------------------------------------------------------------
- struct theThreadPool :
- public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool >
- {
- ThreadPoolHolder operator () () {
- ThreadPoolHolder aRet(new ThreadPool());
- return aRet;
- }
- };
-
ThreadPool::ThreadPool()
{
m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
@@ -132,46 +123,24 @@ namespace cppu_threadpool
}
#endif
}
- ThreadPoolHolder ThreadPool::getInstance()
- {
- return theThreadPool::get();
- }
-
void ThreadPool::dispose( sal_Int64 nDisposeId )
{
- if( nDisposeId )
- {
- m_DisposedCallerAdmin->dispose( nDisposeId );
+ m_DisposedCallerAdmin->dispose( nDisposeId );
- MutexGuard guard( m_mutex );
- for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
- ii != m_mapQueue.end();
- ++ii)
+ MutexGuard guard( m_mutex );
+ for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
+ ii != m_mapQueue.end();
+ ++ii)
+ {
+ if( (*ii).second.first )
{
- if( (*ii).second.first )
- {
- (*ii).second.first->dispose( nDisposeId );
- }
- if( (*ii).second.second )
- {
- (*ii).second.second->dispose( nDisposeId );
- }
+ (*ii).second.first->dispose( nDisposeId );
}
- }
- else
- {
+ if( (*ii).second.second )
{
- MutexGuard guard( m_mutexWaitingThreadList );
- for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
- ii != m_lstThreads.end() ;
- ++ ii )
- {
- // wake the threads up
- osl_setCondition( (*ii)->condition );
- }
+ (*ii).second.second->dispose( nDisposeId );
}
- ThreadAdmin::getInstance()->join();
}
}
@@ -185,7 +154,7 @@ namespace cppu_threadpool
* a new request comes in, this thread is reused. This is done only to improve performance,
* it is not required for threadpool functionality.
******************/
- void ThreadPool::waitInPool( ORequestThread * pThread )
+ void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
{
struct WaitingThread waitingThread;
waitingThread.condition = osl_createCondition();
@@ -201,7 +170,7 @@ namespace cppu_threadpool
{
MutexGuard guard ( m_mutexWaitingThreadList );
- if( waitingThread.thread )
+ if( waitingThread.thread.is() )
{
// thread wasn't reused, remove it from the list
WaitingThreadList::iterator ii = find(
@@ -214,6 +183,21 @@ namespace cppu_threadpool
osl_destroyCondition( waitingThread.condition );
}
+ void ThreadPool::joinWorkers()
+ {
+ {
+ MutexGuard guard( m_mutexWaitingThreadList );
+ for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
+ ii != m_lstThreads.end() ;
+ ++ ii )
+ {
+ // wake the threads up
+ osl_setCondition( (*ii)->condition );
+ }
+ }
+ m_aThreadAdmin.join();
+ }
+
void ThreadPool::createThread( JobQueue *pQueue ,
const ByteSequence &aThreadId,
sal_Bool bAsynchron )
@@ -240,10 +224,9 @@ namespace cppu_threadpool
if( bCreate )
{
- ORequestThread *pThread =
- new ORequestThread( pQueue , aThreadId, bAsynchron);
- // deletes itself !
- pThread->create();
+ rtl::Reference< ORequestThread > pThread(
+ new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
+ pThread->launch();
}
}
@@ -385,6 +368,12 @@ namespace cppu_threadpool
}
}
+// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
+// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
+// (within the last uno_threadpool_destroy) all worker threads spawned by that
+// ThreadPool instance are joined (which implies that uno_threadpool_destroy
+// must never be called from a worker thread); afterwards, the next call to
+// uno_threadpool_create (if any) will lead to a new ThreadPool instance.
using namespace cppu_threadpool;
@@ -415,27 +404,47 @@ struct _uno_ThreadPool
sal_Int32 dummy;
};
+namespace {
+
+ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
+{
+ MutexGuard guard( Mutex::getGlobalMutex() );
+ assert( g_pThreadpoolHashSet != 0 );
+ ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
+ assert( i != g_pThreadpoolHashSet->end() );
+ return i->second;
+}
+
+}
+
extern "C" uno_ThreadPool SAL_CALL
uno_threadpool_create() SAL_THROW_EXTERN_C()
{
MutexGuard guard( Mutex::getGlobalMutex() );
+ ThreadPoolHolder p;
if( ! g_pThreadpoolHashSet )
{
g_pThreadpoolHashSet = new ThreadpoolHashSet();
+ p = new ThreadPool;
+ }
+ else
+ {
+ assert( !g_pThreadpoolHashSet->empty() );
+ p = g_pThreadpoolHashSet->begin()->second;
}
// Just ensure that the handle is unique in the process (via heap)
uno_ThreadPool h = new struct _uno_ThreadPool;
- g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) );
+ g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
return h;
}
extern "C" void SAL_CALL
-uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
+uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
{
sal_Sequence *pThreadId = 0;
uno_getIdOfCurrentThread( &pThreadId );
- ThreadPool::getInstance()->prepare( pThreadId );
+ getThreadPool( hPool )->prepare( pThreadId );
rtl_byte_sequence_release( pThreadId );
uno_releaseIdFromCurrentThread();
}
@@ -447,7 +456,7 @@ uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
sal_Sequence *pThreadId = 0;
uno_getIdOfCurrentThread( &pThreadId );
*ppJob =
- ThreadPool::getInstance()->enter(
+ getThreadPool( hPool )->enter(
pThreadId,
sal::static_int_cast< sal_Int64 >(
reinterpret_cast< sal_IntPtr >(hPool)) );
@@ -463,19 +472,19 @@ uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
extern "C" void SAL_CALL
uno_threadpool_putJob(
- uno_ThreadPool,
+ uno_ThreadPool hPool,
sal_Sequence *pThreadId,
void *pJob,
void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
{
- ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest );
+ getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
}
extern "C" void SAL_CALL
uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
{
- ThreadPool::getInstance()->dispose(
+ getThreadPool(hPool)->dispose(
sal::static_int_cast< sal_Int64 >(
reinterpret_cast< sal_IntPtr >(hPool)) );
}
@@ -483,9 +492,8 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
extern "C" void SAL_CALL
uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
{
- assert(hPool != 0);
-
- ThreadPool::getInstance()->destroy(
+ ThreadPoolHolder p( getThreadPool(hPool) );
+ p->destroy(
sal::static_int_cast< sal_Int64 >(
reinterpret_cast< sal_IntPtr >(hPool)) );
@@ -510,7 +518,7 @@ uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
if( empty )
{
- uno_threadpool_dispose( 0 );
+ p->joinWorkers();
}
}
diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx
index 18bb47a1ff20..3ff52b362af1 100644
--- a/cppu/source/threadpool/threadpool.hxx
+++ b/cppu/source/threadpool/threadpool.hxx
@@ -25,11 +25,19 @@
* for a copy of the LGPLv3 License.
*
************************************************************************/
+
+#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX
+#define INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX
+
+#include <list>
+
#include <boost/unordered_map.hpp>
#include <osl/conditn.h>
#include <rtl/byteseq.hxx>
+#include <rtl/ref.hxx>
+#include <salhelper/simplereferenceobject.hxx>
#include <boost/shared_ptr.hpp>
@@ -74,7 +82,7 @@ namespace cppu_threadpool {
struct WaitingThread
{
oslCondition condition;
- ORequestThread *thread;
+ rtl::Reference< ORequestThread > thread;
};
typedef ::std::list < struct ::cppu_threadpool::WaitingThread * > WaitingThreadList;
@@ -98,15 +106,32 @@ namespace cppu_threadpool {
DisposedCallerList m_lst;
};
+ class ThreadAdmin
+ {
+ public:
+ ThreadAdmin();
+ ~ThreadAdmin ();
+
+ void add( rtl::Reference< ORequestThread > const & );
+ void remove( rtl::Reference< ORequestThread > const & );
+ void join();
+
+ void remove_locked( rtl::Reference< ORequestThread > const & );
+ ::osl::Mutex m_mutex;
+
+ private:
+ ::std::list< rtl::Reference< ORequestThread > > m_lst;
+ bool m_disposed;
+ };
+
class ThreadPool;
- typedef boost::shared_ptr<ThreadPool> ThreadPoolHolder;
+ typedef rtl::Reference<ThreadPool> ThreadPoolHolder;
- class ThreadPool
+ class ThreadPool: public salhelper::SimpleReferenceObject
{
public:
ThreadPool();
~ThreadPool();
- static ThreadPoolHolder getInstance();
void dispose( sal_Int64 nDisposeId );
void destroy( sal_Int64 nDisposeId );
@@ -124,7 +149,12 @@ namespace cppu_threadpool {
********/
sal_Bool revokeQueue( const ByteSequence & aThreadId , sal_Bool bAsynchron );
- void waitInPool( ORequestThread *pThread );
+ void waitInPool( rtl::Reference< ORequestThread > const & pThread );
+
+ void joinWorkers();
+
+ ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; }
+
private:
void createThread( JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron);
@@ -136,8 +166,11 @@ namespace cppu_threadpool {
WaitingThreadList m_lstThreads;
DisposedCallerAdminHolder m_DisposedCallerAdmin;
+ ThreadAdmin m_aThreadAdmin;
};
} // end namespace cppu_threadpool
+#endif
+
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/util/makefile.mk b/cppu/util/makefile.mk
index ea0c09488c85..cf8e1bd5133a 100644
--- a/cppu/util/makefile.mk
+++ b/cppu/util/makefile.mk
@@ -56,7 +56,7 @@ SHL1TARGET=$(TARGET)$(UDK_MAJOR)
SHL1TARGET= uno_$(TARGET)
.ENDIF
-SHL1STDLIBS = $(SALLIB)
+SHL1STDLIBS = $(SALLIB) $(SALHELPERLIB)
SHL1DEPN=
.IF "$(COM)" == "MSC"
diff --git a/cpputools/source/unoexe/unoexe.cxx b/cpputools/source/unoexe/unoexe.cxx
index 684bc39934e1..46fcbf8b310a 100644
--- a/cpputools/source/unoexe/unoexe.cxx
+++ b/cpputools/source/unoexe/unoexe.cxx
@@ -847,6 +847,10 @@ SAL_IMPLEMENT_MAIN_WITH_ARGS(argc,)
if (! xComp.is())
throw RuntimeException( OUString( RTL_CONSTASCII_USTRINGPARAM("bridge factory does not export interface \"com.sun.star.lang.XComponent\"!" ) ), Reference< XInterface >() );
ODisposingListener::waitFor( xComp );
+ xComp->dispose();
+ // explicitly dispose the remote bridge so that it joins
+ // on all spawned threads before process exit (see
+ // binaryurp/source/bridge.cxx for details)
break;
}
}