diff options
Diffstat (limited to 'cppu')
-rw-r--r-- | cppu/inc/uno/threadpool.h | 8 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.cxx | 184 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.hxx | 52 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.cxx | 124 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.hxx | 43 | ||||
-rw-r--r-- | cppu/util/makefile.mk | 2 |
6 files changed, 207 insertions, 206 deletions
diff --git a/cppu/inc/uno/threadpool.h b/cppu/inc/uno/threadpool.h index 0c647442e377..77bb712e53b4 100644 --- a/cppu/inc/uno/threadpool.h +++ b/cppu/inc/uno/threadpool.h @@ -168,6 +168,2 @@ uno_threadpool_putJob( @param hPool The handle to be disposed. - In case, hPool is 0, this function joins on all threads created - by the threadpool administration. This may e.g. used to ensure, that - no threads are inside the cppu library anymore, in case it needs to get - unloaded. @@ -182,2 +178,6 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C(); uno_threadpool_destroy(). + + A call to uno_threadpool_destroy can synchronously join on spawned worker + threads, so this function must never be called from such a worker thread. + @see uno_threadpool_create() diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx index cc22a453c79d..12ea09a131ee 100644 --- a/cppu/source/threadpool/thread.cxx +++ b/cppu/source/threadpool/thread.cxx @@ -35,3 +35,2 @@ #include <com/sun/star/uno/XInterface.hpp> -#include <rtl/instance.hxx> #include <rtl/ustring.h> @@ -50,13 +49,2 @@ namespace css = com::sun::star; using namespace osl; -extern "C" { - -void SAL_CALL cppu_requestThreadWorker( void *pVoid ) -{ - ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid; - - pThread->run(); - pThread->onTerminated(); -} - -} @@ -77,3 +65,3 @@ namespace cppu_threadpool { - void ThreadAdmin::add( ORequestThread *p ) + void ThreadAdmin::add( rtl::Reference< ORequestThread > const & p ) { @@ -92,8 +80,15 @@ namespace cppu_threadpool { - void ThreadAdmin::remove( ORequestThread * p ) + void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p ) + { + ::std::list< rtl::Reference< ORequestThread > >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); + if( ii != m_lst.end() ) + { + m_lst.erase( ii ); + } + } + + void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p ) { MutexGuard aGuard( m_mutex ); - ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); - OSL_ASSERT( ii != m_lst.end() ); - m_lst.erase( ii ); + remove_locked( p ); } @@ -106,33 +101,16 @@ namespace cppu_threadpool { } - ORequestThread *pCurrent; - do + for (;;) { - pCurrent = 0; + rtl::Reference< ORequestThread > pCurrent; { MutexGuard aGuard( m_mutex ); - if( ! m_lst.empty() ) + if( m_lst.empty() ) { - pCurrent = m_lst.front(); - pCurrent->setDeleteSelf( sal_False ); + break; } + pCurrent = m_lst.front(); + m_lst.pop_front(); } - if ( pCurrent ) - { - pCurrent->join(); - delete pCurrent; - } - } while( pCurrent ); - } - - struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin > - { - ThreadAdminHolder operator () () { - ThreadAdminHolder aRet(new ThreadAdmin()); - return aRet; + pCurrent->join(); } - }; - - ThreadAdminHolder& ThreadAdmin::getInstance() - { - return theThreadAdmin::get(); } @@ -140,7 +118,7 @@ namespace cppu_threadpool { // ---------------------------------------------------------------------------------- - ORequestThread::ORequestThread( JobQueue *pQueue, + ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron ) - : m_thread( 0 ) - , m_aThreadAdmin( ThreadAdmin::getInstance() ) + : m_aThreadPool( aThreadPool ) , m_pQueue( pQueue ) @@ -148,16 +126,5 @@ namespace cppu_threadpool { , m_bAsynchron( bAsynchron ) - , m_bDeleteSelf( sal_True ) - { - m_aThreadAdmin->add( this ); - } - - - ORequestThread::~ORequestThread() - { - if (m_thread != 0) - { - osl_destroyThread(m_thread); - } - } + {} + ORequestThread::~ORequestThread() {} @@ -172,18 +139,20 @@ namespace cppu_threadpool { - sal_Bool ORequestThread::create() + void ORequestThread::launch() { - OSL_ASSERT(m_thread == 0); // only one running thread per instance - - m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this); - if ( m_thread ) - { - osl_resumeThread( m_thread ); + // Assumption is that osl::Thread::create returns normally with a true + // return value iff it causes osl::Thread::run to start executing: + acquire(); + ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin(); + osl::ClearableMutexGuard g(rThreadAdmin.m_mutex); + rThreadAdmin.add( this ); + try { + if (!create()) { + throw std::runtime_error("osl::Thread::create failed"); + } + } catch (...) { + rThreadAdmin.remove_locked( this ); + g.clear(); + release(); + throw; } - - return m_thread != 0; - } - - void ORequestThread::join() - { - osl_joinWithThread( m_thread ); } @@ -192,7 +161,4 @@ namespace cppu_threadpool { { - m_aThreadAdmin->remove( this ); - if( m_bDeleteSelf ) - { - delete this; - } + m_aThreadPool->getThreadAdmin().remove( this ); + release(); } @@ -201,41 +167,49 @@ namespace cppu_threadpool { { - ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance(); - - while ( m_pQueue ) + try { - if( ! m_bAsynchron ) + while ( m_pQueue ) { - if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + if( ! m_bAsynchron ) { - OSL_ASSERT( false ); + if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + { + OSL_ASSERT( false ); + } } - } - while( ! m_pQueue->isEmpty() ) - { - // Note : Oneways should not get a disposable disposeid, - // It does not make sense to dispose a call in this state. - // That's way we put it an disposeid, that can't be used otherwise. - m_pQueue->enter( - sal::static_int_cast< sal_Int64 >( - reinterpret_cast< sal_IntPtr >(this)), - sal_True ); - - if( m_pQueue->isEmpty() ) + while( ! m_pQueue->isEmpty() ) { - theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); - // Note : revokeQueue might have failed because m_pQueue.isEmpty() - // may be false (race). + // Note : Oneways should not get a disposable disposeid, + // It does not make sense to dispose a call in this state. + // That's way we put it an disposeid, that can't be used otherwise. + m_pQueue->enter( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(this)), + sal_True ); + + if( m_pQueue->isEmpty() ) + { + m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); + // Note : revokeQueue might have failed because m_pQueue.isEmpty() + // may be false (race). + } } - } - delete m_pQueue; - m_pQueue = 0; + delete m_pQueue; + m_pQueue = 0; - if( ! m_bAsynchron ) - { - uno_releaseIdFromCurrentThread(); - } + if( ! m_bAsynchron ) + { + uno_releaseIdFromCurrentThread(); + } - theThreadPool->waitInPool( this ); + m_aThreadPool->waitInPool( this ); + } + } + catch (...) + { + // Work around the problem that onTerminated is not called if run + // throws an exception: + onTerminated(); + throw; } diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx index a3ea45aadaed..88f3d91f8722 100644 --- a/cppu/source/threadpool/thread.hxx +++ b/cppu/source/threadpool/thread.hxx @@ -30,8 +30,8 @@ -#include <list> +#include <osl/thread.hxx> #include <sal/types.h> - -#include <osl/thread.h> +#include <salhelper/simplereferenceobject.hxx> #include "jobqueue.hxx" +#include "threadpool.hxx" @@ -40,4 +40,2 @@ namespace cppu_threadpool { class JobQueue; - class ThreadAdmin; - typedef boost::shared_ptr<ThreadAdmin> ThreadAdminHolder; @@ -47,9 +45,11 @@ namespace cppu_threadpool { //----------------------------------------- - class ORequestThread + class ORequestThread: + public salhelper::SimpleReferenceObject, public osl::Thread { public: - ORequestThread( JobQueue * , + ORequestThread( ThreadPoolHolder const &aThreadPool, + JobQueue * , const ::rtl::ByteSequence &aThreadId, sal_Bool bAsynchron ); - ~ORequestThread(); + virtual ~ORequestThread(); @@ -57,12 +57,15 @@ namespace cppu_threadpool { - sal_Bool create(); - void join(); - void onTerminated(); - void run(); - inline void setDeleteSelf( sal_Bool b ) - { m_bDeleteSelf = b; } + void launch(); + + static inline void * operator new(std::size_t size) + { return SimpleReferenceObject::operator new(size); } + + static inline void operator delete(void * pointer) + { SimpleReferenceObject::operator delete(pointer); } private: - oslThread m_thread; - ThreadAdminHolder m_aThreadAdmin; + virtual void SAL_CALL run(); + virtual void SAL_CALL onTerminated(); + + ThreadPoolHolder m_aThreadPool; JobQueue *m_pQueue; @@ -70,19 +73,2 @@ namespace cppu_threadpool { sal_Bool m_bAsynchron; - sal_Bool m_bDeleteSelf; - }; - - class ThreadAdmin - { - public: - ThreadAdmin(); - ~ThreadAdmin (); - static ThreadAdminHolder &getInstance(); - void add( ORequestThread * ); - void remove( ORequestThread * ); - void join(); - - private: - ::osl::Mutex m_mutex; - ::std::list< ORequestThread * > m_lst; - bool m_disposed; }; diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx index f9f0be6c7d03..9099b7e1981e 100644 --- a/cppu/source/threadpool/threadpool.cxx +++ b/cppu/source/threadpool/threadpool.cxx @@ -111,11 +111,2 @@ namespace cppu_threadpool - struct theThreadPool : - public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool > - { - ThreadPoolHolder operator () () { - ThreadPoolHolder aRet(new ThreadPool()); - return aRet; - } - }; - ThreadPool::ThreadPool() @@ -134,7 +125,2 @@ namespace cppu_threadpool } - ThreadPoolHolder ThreadPool::getInstance() - { - return theThreadPool::get(); - } - @@ -142,34 +128,17 @@ namespace cppu_threadpool { - if( nDisposeId ) - { - m_DisposedCallerAdmin->dispose( nDisposeId ); + m_DisposedCallerAdmin->dispose( nDisposeId ); - MutexGuard guard( m_mutex ); - for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; - ii != m_mapQueue.end(); - ++ii) + MutexGuard guard( m_mutex ); + for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; + ii != m_mapQueue.end(); + ++ii) + { + if( (*ii).second.first ) { - if( (*ii).second.first ) - { - (*ii).second.first->dispose( nDisposeId ); - } - if( (*ii).second.second ) - { - (*ii).second.second->dispose( nDisposeId ); - } + (*ii).second.first->dispose( nDisposeId ); } - } - else - { + if( (*ii).second.second ) { - MutexGuard guard( m_mutexWaitingThreadList ); - for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; - ii != m_lstThreads.end() ; - ++ ii ) - { - // wake the threads up - osl_setCondition( (*ii)->condition ); - } + (*ii).second.second->dispose( nDisposeId ); } - ThreadAdmin::getInstance()->join(); } @@ -187,3 +156,3 @@ namespace cppu_threadpool ******************/ - void ThreadPool::waitInPool( ORequestThread * pThread ) + void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread ) { @@ -203,3 +172,3 @@ namespace cppu_threadpool MutexGuard guard ( m_mutexWaitingThreadList ); - if( waitingThread.thread ) + if( waitingThread.thread.is() ) { @@ -216,2 +185,17 @@ namespace cppu_threadpool + void ThreadPool::joinWorkers() + { + { + MutexGuard guard( m_mutexWaitingThreadList ); + for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; + ii != m_lstThreads.end() ; + ++ ii ) + { + // wake the threads up + osl_setCondition( (*ii)->condition ); + } + } + m_aThreadAdmin.join(); + } + void ThreadPool::createThread( JobQueue *pQueue , @@ -242,6 +226,5 @@ namespace cppu_threadpool { - ORequestThread *pThread = - new ORequestThread( pQueue , aThreadId, bAsynchron); - // deletes itself ! - pThread->create(); + rtl::Reference< ORequestThread > pThread( + new ORequestThread( this, pQueue , aThreadId, bAsynchron) ); + pThread->launch(); } @@ -387,2 +370,8 @@ namespace cppu_threadpool +// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life +// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty +// (within the last uno_threadpool_destroy) all worker threads spawned by that +// ThreadPool instance are joined (which implies that uno_threadpool_destroy +// must never be called from a worker thread); afterwards, the next call to +// uno_threadpool_create (if any) will lead to a new ThreadPool instance. @@ -417,2 +406,15 @@ struct _uno_ThreadPool +namespace { + +ThreadPoolHolder getThreadPool( uno_ThreadPool hPool ) +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + assert( g_pThreadpoolHashSet != 0 ); + ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) ); + assert( i != g_pThreadpoolHashSet->end() ); + return i->second; +} + +} + extern "C" uno_ThreadPool SAL_CALL @@ -421,2 +423,3 @@ uno_threadpool_create() SAL_THROW_EXTERN_C() MutexGuard guard( Mutex::getGlobalMutex() ); + ThreadPoolHolder p; if( ! g_pThreadpoolHashSet ) @@ -424,2 +427,8 @@ uno_threadpool_create() SAL_THROW_EXTERN_C() g_pThreadpoolHashSet = new ThreadpoolHashSet(); + p = new ThreadPool; + } + else + { + assert( !g_pThreadpoolHashSet->empty() ); + p = g_pThreadpoolHashSet->begin()->second; } @@ -428,3 +437,3 @@ uno_threadpool_create() SAL_THROW_EXTERN_C() uno_ThreadPool h = new struct _uno_ThreadPool; - g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) ); + g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) ); return h; @@ -433,3 +442,3 @@ uno_threadpool_create() SAL_THROW_EXTERN_C() extern "C" void SAL_CALL -uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C() +uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { @@ -437,3 +446,3 @@ uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C() uno_getIdOfCurrentThread( &pThreadId ); - ThreadPool::getInstance()->prepare( pThreadId ); + getThreadPool( hPool )->prepare( pThreadId ); rtl_byte_sequence_release( pThreadId ); @@ -449,3 +458,3 @@ uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) *ppJob = - ThreadPool::getInstance()->enter( + getThreadPool( hPool )->enter( pThreadId, @@ -465,3 +474,3 @@ extern "C" void SAL_CALL uno_threadpool_putJob( - uno_ThreadPool, + uno_ThreadPool hPool, sal_Sequence *pThreadId, @@ -471,3 +480,3 @@ uno_threadpool_putJob( { - ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest ); + getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest ); } @@ -477,3 +486,3 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { - ThreadPool::getInstance()->dispose( + getThreadPool(hPool)->dispose( sal::static_int_cast< sal_Int64 >( @@ -485,5 +494,4 @@ uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { - assert(hPool != 0); - - ThreadPool::getInstance()->destroy( + ThreadPoolHolder p( getThreadPool(hPool) ); + p->destroy( sal::static_int_cast< sal_Int64 >( @@ -512,3 +520,3 @@ uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() { - uno_threadpool_dispose( 0 ); + p->joinWorkers(); } diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx index 18bb47a1ff20..3ff52b362af1 100644 --- a/cppu/source/threadpool/threadpool.hxx +++ b/cppu/source/threadpool/threadpool.hxx @@ -27,2 +27,8 @@ ************************************************************************/ + +#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX +#define INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX + +#include <list> + #include <boost/unordered_map.hpp> @@ -32,2 +38,4 @@ #include <rtl/byteseq.hxx> +#include <rtl/ref.hxx> +#include <salhelper/simplereferenceobject.hxx> @@ -76,3 +84,3 @@ namespace cppu_threadpool { oslCondition condition; - ORequestThread *thread; + rtl::Reference< ORequestThread > thread; }; @@ -100,6 +108,24 @@ namespace cppu_threadpool { + class ThreadAdmin + { + public: + ThreadAdmin(); + ~ThreadAdmin (); + + void add( rtl::Reference< ORequestThread > const & ); + void remove( rtl::Reference< ORequestThread > const & ); + void join(); + + void remove_locked( rtl::Reference< ORequestThread > const & ); + ::osl::Mutex m_mutex; + + private: + ::std::list< rtl::Reference< ORequestThread > > m_lst; + bool m_disposed; + }; + class ThreadPool; - typedef boost::shared_ptr<ThreadPool> ThreadPoolHolder; + typedef rtl::Reference<ThreadPool> ThreadPoolHolder; - class ThreadPool + class ThreadPool: public salhelper::SimpleReferenceObject { @@ -108,3 +134,2 @@ namespace cppu_threadpool { ~ThreadPool(); - static ThreadPoolHolder getInstance(); @@ -126,3 +151,8 @@ namespace cppu_threadpool { - void waitInPool( ORequestThread *pThread ); + void waitInPool( rtl::Reference< ORequestThread > const & pThread ); + + void joinWorkers(); + + ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; } + private: @@ -138,2 +168,3 @@ namespace cppu_threadpool { DisposedCallerAdminHolder m_DisposedCallerAdmin; + ThreadAdmin m_aThreadAdmin; }; @@ -142,2 +173,4 @@ namespace cppu_threadpool { +#endif + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/cppu/util/makefile.mk b/cppu/util/makefile.mk index ea0c09488c85..cf8e1bd5133a 100644 --- a/cppu/util/makefile.mk +++ b/cppu/util/makefile.mk @@ -58,3 +58,3 @@ SHL1TARGET= uno_$(TARGET) -SHL1STDLIBS = $(SALLIB) +SHL1STDLIBS = $(SALLIB) $(SALHELPERLIB) |