diff options
Diffstat (limited to 'cppu/source/threadpool')
-rw-r--r-- | cppu/source/threadpool/current.cxx | 297 | ||||
-rw-r--r-- | cppu/source/threadpool/current.hxx | 48 | ||||
-rw-r--r-- | cppu/source/threadpool/jobqueue.cxx | 196 | ||||
-rw-r--r-- | cppu/source/threadpool/jobqueue.hxx | 85 | ||||
-rw-r--r-- | cppu/source/threadpool/makefile.mk | 49 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.cxx | 219 | ||||
-rw-r--r-- | cppu/source/threadpool/thread.hxx | 91 | ||||
-rw-r--r-- | cppu/source/threadpool/threadident.cxx | 135 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.cxx | 506 | ||||
-rw-r--r-- | cppu/source/threadpool/threadpool.hxx | 140 |
10 files changed, 1766 insertions, 0 deletions
diff --git a/cppu/source/threadpool/current.cxx b/cppu/source/threadpool/current.cxx new file mode 100644 index 000000000000..23bd52ca1917 --- /dev/null +++ b/cppu/source/threadpool/current.cxx @@ -0,0 +1,297 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ + +// MARKER(update_precomp.py): autogen include statement, do not remove +#include "precompiled_cppu.hxx" + +#include "rtl/uuid.h" +#include "osl/thread.h" +#include "osl/mutex.hxx" + +#include "uno/environment.hxx" +#include "uno/mapping.hxx" +#include "uno/lbnames.h" +#include "typelib/typedescription.h" + +#include "current.hxx" + + +using namespace ::osl; +using namespace ::rtl; +using namespace ::cppu; +using namespace ::com::sun::star::uno; + +namespace cppu +{ + +//-------------------------------------------------------------------------------------------------- +class SAL_NO_VTABLE XInterface +{ +public: + virtual void SAL_CALL slot_queryInterface() = 0; + virtual void SAL_CALL acquire() throw () = 0; + virtual void SAL_CALL release() throw () = 0; +}; +//-------------------------------------------------------------------------------------------------- +static typelib_InterfaceTypeDescription * get_type_XCurrentContext() +{ + static typelib_InterfaceTypeDescription * s_type_XCurrentContext = 0; + if (0 == s_type_XCurrentContext) + { + ::osl::MutexGuard aGuard( ::osl::Mutex::getGlobalMutex() ); + if (0 == s_type_XCurrentContext) + { + OUString sTypeName( RTL_CONSTASCII_USTRINGPARAM("com.sun.star.uno.XCurrentContext") ); + typelib_InterfaceTypeDescription * pTD = 0; + typelib_TypeDescriptionReference * pMembers[1] = { 0 }; + OUString sMethodName0( + RTL_CONSTASCII_USTRINGPARAM("com.sun.star.uno.XCurrentContext::getValueByName") ); + typelib_typedescriptionreference_new( + &pMembers[0], + typelib_TypeClass_INTERFACE_METHOD, + sMethodName0.pData ); + typelib_typedescription_newInterface( + &pTD, + sTypeName.pData, 0x00000000, 0x0000, 0x0000, 0x00000000, 0x00000000, + * typelib_static_type_getByTypeClass( typelib_TypeClass_INTERFACE ), + 1, + pMembers ); + + typelib_typedescription_register( (typelib_TypeDescription**)&pTD ); + typelib_typedescriptionreference_release( pMembers[0] ); + + typelib_InterfaceMethodTypeDescription * pMethod = 0; + typelib_Parameter_Init aParameters[1]; + OUString sParamName0( RTL_CONSTASCII_USTRINGPARAM("Name") ); + OUString sParamType0( RTL_CONSTASCII_USTRINGPARAM("string") ); + aParameters[0].pParamName = sParamName0.pData; + aParameters[0].eTypeClass = typelib_TypeClass_STRING; + aParameters[0].pTypeName = sParamType0.pData; + aParameters[0].bIn = sal_True; + aParameters[0].bOut = sal_False; + rtl_uString * pExceptions[1]; + OUString sExceptionName0( + RTL_CONSTASCII_USTRINGPARAM("com.sun.star.uno.RuntimeException") ); + pExceptions[0] = sExceptionName0.pData; + OUString sReturnType0( RTL_CONSTASCII_USTRINGPARAM("any") ); + typelib_typedescription_newInterfaceMethod( + &pMethod, + 3, sal_False, + sMethodName0.pData, + typelib_TypeClass_ANY, sReturnType0.pData, + 1, aParameters, 1, pExceptions ); + typelib_typedescription_register( (typelib_TypeDescription**)&pMethod ); + typelib_typedescription_release( (typelib_TypeDescription*)pMethod ); + // another static ref: + ++reinterpret_cast< typelib_TypeDescription * >( pTD )-> + nStaticRefCount; + s_type_XCurrentContext = pTD; + } + } + return s_type_XCurrentContext; +} + +//################################################################################################## + +//================================================================================================== +class ThreadKey +{ + sal_Bool _bInit; + oslThreadKey _hThreadKey; + oslThreadKeyCallbackFunction _pCallback; + +public: + inline oslThreadKey getThreadKey() SAL_THROW( () ); + + inline ThreadKey( oslThreadKeyCallbackFunction pCallback ) SAL_THROW( () ); + inline ~ThreadKey() SAL_THROW( () ); +}; +//__________________________________________________________________________________________________ +inline ThreadKey::ThreadKey( oslThreadKeyCallbackFunction pCallback ) SAL_THROW( () ) + : _bInit( sal_False ) + , _pCallback( pCallback ) +{ +} +//__________________________________________________________________________________________________ +inline ThreadKey::~ThreadKey() SAL_THROW( () ) +{ + if (_bInit) + { + ::osl_destroyThreadKey( _hThreadKey ); + } +} +//__________________________________________________________________________________________________ +inline oslThreadKey ThreadKey::getThreadKey() SAL_THROW( () ) +{ + if (! _bInit) + { + MutexGuard aGuard( Mutex::getGlobalMutex() ); + if (! _bInit) + { + _hThreadKey = ::osl_createThreadKey( _pCallback ); + _bInit = sal_True; + } + } + return _hThreadKey; +} + +//================================================================================================== +extern "C" void SAL_CALL delete_IdContainer( void * p ) +{ + if (p) + { + IdContainer * pId = reinterpret_cast< IdContainer * >( p ); + if (pId->pCurrentContext) + { + (*pId->pCurrentContextEnv->releaseInterface)( + pId->pCurrentContextEnv, pId->pCurrentContext ); + (*((uno_Environment *)pId->pCurrentContextEnv)->release)( + (uno_Environment *)pId->pCurrentContextEnv ); + } + if (pId->bInit) + { + ::rtl_byte_sequence_release( pId->pLocalThreadId ); + ::rtl_byte_sequence_release( pId->pCurrentId ); + } + delete pId; + } +} +//================================================================================================== +IdContainer * getIdContainer() SAL_THROW( () ) +{ + static ThreadKey s_key( delete_IdContainer ); + oslThreadKey aKey = s_key.getThreadKey(); + + IdContainer * pId = reinterpret_cast< IdContainer * >( ::osl_getThreadKeyData( aKey ) ); + if (! pId) + { + pId = new IdContainer(); + pId->pCurrentContext = 0; + pId->pCurrentContextEnv = 0; + pId->bInit = sal_False; + ::osl_setThreadKeyData( aKey, pId ); + } + return pId; +} + +} + +//################################################################################################## +extern "C" sal_Bool SAL_CALL uno_setCurrentContext( + void * pCurrentContext, + rtl_uString * pEnvTypeName, void * pEnvContext ) + SAL_THROW_EXTERN_C() +{ + IdContainer * pId = getIdContainer(); + OSL_ASSERT( pId ); + + // free old one + if (pId->pCurrentContext) + { + (*pId->pCurrentContextEnv->releaseInterface)( + pId->pCurrentContextEnv, pId->pCurrentContext ); + (*((uno_Environment *)pId->pCurrentContextEnv)->release)( + (uno_Environment *)pId->pCurrentContextEnv ); + pId->pCurrentContextEnv = 0; + + pId->pCurrentContext = 0; + } + + if (pCurrentContext) + { + uno_Environment * pEnv = 0; + ::uno_getEnvironment( &pEnv, pEnvTypeName, pEnvContext ); + OSL_ASSERT( pEnv && pEnv->pExtEnv ); + if (pEnv) + { + if (pEnv->pExtEnv) + { + pId->pCurrentContextEnv = pEnv->pExtEnv; + (*pId->pCurrentContextEnv->acquireInterface)( + pId->pCurrentContextEnv, pCurrentContext ); + pId->pCurrentContext = pCurrentContext; + } + else + { + (*pEnv->release)( pEnv ); + return sal_False; + } + } + else + { + return sal_False; + } + } + return sal_True; +} +//################################################################################################## +extern "C" sal_Bool SAL_CALL uno_getCurrentContext( + void ** ppCurrentContext, rtl_uString * pEnvTypeName, void * pEnvContext ) + SAL_THROW_EXTERN_C() +{ + IdContainer * pId = getIdContainer(); + OSL_ASSERT( pId ); + + Environment target_env; + + // release inout parameter + if (*ppCurrentContext) + { + target_env = Environment(rtl::OUString(pEnvTypeName), pEnvContext); + OSL_ASSERT( target_env.is() ); + if (! target_env.is()) + return sal_False; + uno_ExtEnvironment * pEnv = target_env.get()->pExtEnv; + OSL_ASSERT( 0 != pEnv ); + if (0 == pEnv) + return sal_False; + (*pEnv->releaseInterface)( pEnv, *ppCurrentContext ); + + *ppCurrentContext = 0; + } + + // case: null-ref + if (0 == pId->pCurrentContext) + return sal_True; + + if (! target_env.is()) + { + target_env = Environment(rtl::OUString(pEnvTypeName), pEnvContext); + OSL_ASSERT( target_env.is() ); + if (! target_env.is()) + return sal_False; + } + + Mapping mapping((uno_Environment *) pId->pCurrentContextEnv, target_env.get()); + OSL_ASSERT( mapping.is() ); + if (! mapping.is()) + return sal_False; + + mapping.mapInterface(ppCurrentContext, pId->pCurrentContext, ::cppu::get_type_XCurrentContext() ); + + return sal_True; +} diff --git a/cppu/source/threadpool/current.hxx b/cppu/source/threadpool/current.hxx new file mode 100644 index 000000000000..fbe0531cf178 --- /dev/null +++ b/cppu/source/threadpool/current.hxx @@ -0,0 +1,48 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ + +#include "rtl/byteseq.h" + +typedef struct _uno_ExtEnvironment uno_ExtEnvironment; + +namespace cppu +{ +struct CurrentContext; + +struct IdContainer +{ + void * pCurrentContext; + uno_ExtEnvironment * pCurrentContextEnv; + // + sal_Bool bInit; + sal_Sequence * pLocalThreadId; + sal_Int32 nRefCountOfCurrentId; + sal_Sequence * pCurrentId; +}; + +IdContainer * getIdContainer() SAL_THROW( () ); +} diff --git a/cppu/source/threadpool/jobqueue.cxx b/cppu/source/threadpool/jobqueue.cxx new file mode 100644 index 000000000000..333a350c0b23 --- /dev/null +++ b/cppu/source/threadpool/jobqueue.cxx @@ -0,0 +1,196 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ + +// MARKER(update_precomp.py): autogen include statement, do not remove +#include "precompiled_cppu.hxx" +#include "jobqueue.hxx" +#include "threadpool.hxx" + +#include <osl/diagnose.h> + +using namespace ::osl; + +namespace cppu_threadpool { + + JobQueue::JobQueue() : + m_nToDo( 0 ), + m_bSuspended( sal_False ), + m_cndWait( osl_createCondition() ) + { + osl_resetCondition( m_cndWait ); + m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance(); + } + + JobQueue::~JobQueue() + { + osl_destroyCondition( m_cndWait ); + } + + + void JobQueue::add( void *pThreadSpecificData, RequestFun * doRequest ) + { + MutexGuard guard( m_mutex ); + Job job = { pThreadSpecificData , doRequest }; + m_lstJob.push_back( job ); + if( ! m_bSuspended ) + { + osl_setCondition( m_cndWait ); + } + m_nToDo ++; + } + + void *JobQueue::enter( sal_Int64 nDisposeId , sal_Bool bReturnWhenNoJob ) + { + void *pReturn = 0; + { + // synchronize with the dispose calls + MutexGuard guard( m_mutex ); + if( m_DisposedCallerAdmin->isDisposed( nDisposeId ) ) + { + return 0; + } + m_lstCallstack.push_front( nDisposeId ); + } + + + while( sal_True ) + { + if( bReturnWhenNoJob ) + { + MutexGuard guard( m_mutex ); + if( m_lstJob.empty() ) + { + break; + } + } + + osl_waitCondition( m_cndWait , 0 ); + + struct Job job={0,0}; + { + // synchronize with add and dispose calls + MutexGuard guard( m_mutex ); + + if( 0 == m_lstCallstack.front() ) + { + // disposed ! + if( m_lstJob.empty() ) + { + osl_resetCondition( m_cndWait ); + } + break; + } + + OSL_ASSERT( ! m_lstJob.empty() ); + if( ! m_lstJob.empty() ) + { + job = m_lstJob.front(); + m_lstJob.pop_front(); + } + if( m_lstJob.empty() ) + { + osl_resetCondition( m_cndWait ); + } + } + + if( job.doRequest ) + { + job.doRequest( job.pThreadSpecificData ); + m_nToDo --; + } + else + { + m_nToDo --; + pReturn = job.pThreadSpecificData; + break; + } + } + + { + // synchronize with the dispose calls + MutexGuard guard( m_mutex ); + m_lstCallstack.pop_front(); + } + + return pReturn; + } + + void JobQueue::dispose( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + for( CallStackList::iterator ii = m_lstCallstack.begin() ; + ii != m_lstCallstack.end() ; + ++ii ) + { + if( (*ii) == nDisposeId ) + { + (*ii) = 0; + } + } + + if( !m_lstCallstack.empty() && ! m_lstCallstack.front() ) + { + // The thread is waiting for a disposed pCallerId, let it go + osl_setCondition( m_cndWait ); + } + } + + void JobQueue::suspend() + { + MutexGuard guard( m_mutex ); + m_bSuspended = sal_True; + } + + void JobQueue::resume() + { + MutexGuard guard( m_mutex ); + m_bSuspended = sal_False; + if( ! m_lstJob.empty() ) + { + osl_setCondition( m_cndWait ); + } + } + + sal_Bool JobQueue::isEmpty() + { + MutexGuard guard( m_mutex ); + return m_lstJob.empty(); + } + + sal_Bool JobQueue::isCallstackEmpty() + { + MutexGuard guard( m_mutex ); + return m_lstCallstack.empty(); + } + + sal_Bool JobQueue::isBusy() + { + return m_nToDo > 0; + } + + +} diff --git a/cppu/source/threadpool/jobqueue.hxx b/cppu/source/threadpool/jobqueue.hxx new file mode 100644 index 000000000000..78202573403e --- /dev/null +++ b/cppu/source/threadpool/jobqueue.hxx @@ -0,0 +1,85 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ + +#ifndef _CPPU_THREADPOOL_JOBQUEUE_HXX_ +#define _CPPU_THREADPOOL_JOBQUEUE_HXX_ + +#include <list> +#include <sal/types.h> + +#include <osl/conditn.h> +#include <osl/mutex.hxx> + +#include <boost/shared_ptr.hpp> + +namespace cppu_threadpool +{ + extern "C" typedef void (SAL_CALL RequestFun)(void *); + + struct Job + { + void *pThreadSpecificData; + RequestFun * doRequest; + }; + + typedef ::std::list < struct Job > JobList; + + typedef ::std::list < sal_Int64 > CallStackList; + + class DisposedCallerAdmin; + typedef boost::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder; + + class JobQueue + { + public: + JobQueue(); + ~JobQueue(); + + void add( void *pThreadSpecificData, RequestFun * doRequest ); + + void *enter( sal_Int64 nDisposeId , sal_Bool bReturnWhenNoJob = sal_False ); + void dispose( sal_Int64 nDisposeId ); + + void suspend(); + void resume(); + + sal_Bool isEmpty(); + sal_Bool isCallstackEmpty(); + sal_Bool isBusy(); + + private: + ::osl::Mutex m_mutex; + JobList m_lstJob; + CallStackList m_lstCallstack; + sal_Int32 m_nToDo; + sal_Bool m_bSuspended; + oslCondition m_cndWait; + DisposedCallerAdminHolder m_DisposedCallerAdmin; + }; +} + +#endif diff --git a/cppu/source/threadpool/makefile.mk b/cppu/source/threadpool/makefile.mk new file mode 100644 index 000000000000..ea5e146b9cf6 --- /dev/null +++ b/cppu/source/threadpool/makefile.mk @@ -0,0 +1,49 @@ +#************************************************************************* +# +# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. +# +# Copyright 2000, 2010 Oracle and/or its affiliates. +# +# OpenOffice.org - a multi-platform office productivity suite +# +# This file is part of OpenOffice.org. +# +# OpenOffice.org is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License version 3 +# only, as published by the Free Software Foundation. +# +# OpenOffice.org is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License version 3 for more details +# (a copy is included in the LICENSE file that accompanied this code). +# +# You should have received a copy of the GNU Lesser General Public License +# version 3 along with OpenOffice.org. If not, see +# <http://www.openoffice.org/license.html> +# for a copy of the LGPLv3 License. +# +#************************************************************************* +PRJ=..$/.. + +PRJNAME=cppu +TARGET=cppu_threadpool + +# --- Settings ----------------------------------------------------- + +.INCLUDE : ..$/..$/util$/makefile.pmk +.INCLUDE : settings.mk + +# ------------------------------------------------------------------ + +SLOFILES=\ + $(SLO)$/threadpool.obj\ + $(SLO)$/jobqueue.obj\ + $(SLO)$/thread.obj\ + $(SLO)$/threadident.obj\ + $(SLO)$/current.obj + +# --- Targets ------------------------------------------------------ + +.INCLUDE : ..$/..$/util$/target.pmk +.INCLUDE : target.mk diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx new file mode 100644 index 000000000000..00c91500d1e5 --- /dev/null +++ b/cppu/source/threadpool/thread.cxx @@ -0,0 +1,219 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ + +// MARKER(update_precomp.py): autogen include statement, do not remove +#include "precompiled_cppu.hxx" +#include <stdio.h> +#include <osl/diagnose.h> +#include <uno/threadpool.h> + +#include <rtl/instance.hxx> + +#include "thread.hxx" +#include "jobqueue.hxx" +#include "threadpool.hxx" + + +using namespace osl; +extern "C" { + +void SAL_CALL cppu_requestThreadWorker( void *pVoid ) +{ + ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid; + + pThread->run(); + pThread->onTerminated(); +} + +} + +namespace cppu_threadpool { + +// ---------------------------------------------------------------------------------- + ThreadAdmin::~ThreadAdmin() + { +#if OSL_DEBUG_LEVEL > 1 + if( m_lst.size() ) + { + fprintf( stderr, "%lu Threads left\n" , static_cast<unsigned long>(m_lst.size()) ); + } +#endif + } + + void ThreadAdmin::add( ORequestThread *p ) + { + MutexGuard aGuard( m_mutex ); + m_lst.push_back( p ); + } + + void ThreadAdmin::remove( ORequestThread * p ) + { + MutexGuard aGuard( m_mutex ); + ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); + OSL_ASSERT( ii != m_lst.end() ); + m_lst.erase( ii ); + } + + void ThreadAdmin::join() + { + ORequestThread *pCurrent; + do + { + pCurrent = 0; + { + MutexGuard aGuard( m_mutex ); + if( ! m_lst.empty() ) + { + pCurrent = m_lst.front(); + pCurrent->setDeleteSelf( sal_False ); + } + } + if ( pCurrent ) + { + pCurrent->join(); + delete pCurrent; + } + } while( pCurrent ); + } + + struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin > + { + ThreadAdminHolder operator () () { + ThreadAdminHolder aRet(new ThreadAdmin()); + return aRet; + } + }; + + ThreadAdminHolder& ThreadAdmin::getInstance() + { + return theThreadAdmin::get(); + } + +// ---------------------------------------------------------------------------------- + ORequestThread::ORequestThread( JobQueue *pQueue, + const ByteSequence &aThreadId, + sal_Bool bAsynchron ) + : m_thread( 0 ) + , m_aThreadAdmin( ThreadAdmin::getInstance() ) + , m_pQueue( pQueue ) + , m_aThreadId( aThreadId ) + , m_bAsynchron( bAsynchron ) + , m_bDeleteSelf( sal_True ) + { + m_aThreadAdmin->add( this ); + } + + + ORequestThread::~ORequestThread() + { + if (m_thread != 0) + { + osl_destroyThread(m_thread); + } + } + + + void ORequestThread::setTask( JobQueue *pQueue, + const ByteSequence &aThreadId, + sal_Bool bAsynchron ) + { + m_pQueue = pQueue; + m_aThreadId = aThreadId; + m_bAsynchron = bAsynchron; + } + + sal_Bool ORequestThread::create() + { + OSL_ASSERT(m_thread == 0); // only one running thread per instance + + m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this); + if ( m_thread ) + { + osl_resumeThread( m_thread ); + } + + return m_thread != 0; + } + + void ORequestThread::join() + { + osl_joinWithThread( m_thread ); + } + + void ORequestThread::onTerminated() + { + m_aThreadAdmin->remove( this ); + if( m_bDeleteSelf ) + { + delete this; + } + } + + void ORequestThread::run() + { + ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance(); + + while ( m_pQueue ) + { + if( ! m_bAsynchron ) + { + if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) + { + OSL_ASSERT( false ); + } + } + + while( ! m_pQueue->isEmpty() ) + { + // Note : Oneways should not get a disposable disposeid, + // It does not make sense to dispose a call in this state. + // That's way we put it an disposeid, that can't be used otherwise. + m_pQueue->enter( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(this)), + sal_True ); + + if( m_pQueue->isEmpty() ) + { + theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); + // Note : revokeQueue might have failed because m_pQueue.isEmpty() + // may be false (race). + } + } + + delete m_pQueue; + m_pQueue = 0; + + if( ! m_bAsynchron ) + { + uno_releaseIdFromCurrentThread(); + } + + theThreadPool->waitInPool( this ); + } + } +} diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx new file mode 100644 index 000000000000..2f7791daf7e5 --- /dev/null +++ b/cppu/source/threadpool/thread.hxx @@ -0,0 +1,91 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ +#ifndef _CPPU_THREADPOOL_THREAD_HXX +#define _CPPU_THREADPOOL_THREAD_HXX + +#include <list> +#include <sal/types.h> + +#include <osl/thread.h> + +#include "jobqueue.hxx" + +namespace cppu_threadpool { + + class JobQueue; + class ThreadAdmin; + typedef boost::shared_ptr<ThreadAdmin> ThreadAdminHolder; + + //----------------------------------------- + // private thread class for the threadpool + // independent from vos + //----------------------------------------- + class ORequestThread + { + public: + ORequestThread( JobQueue * , + const ::rtl::ByteSequence &aThreadId, + sal_Bool bAsynchron ); + ~ORequestThread(); + + void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , sal_Bool bAsynchron ); + + sal_Bool create(); + void join(); + void onTerminated(); + void run(); + inline void setDeleteSelf( sal_Bool b ) + { m_bDeleteSelf = b; } + + private: + oslThread m_thread; + ThreadAdminHolder m_aThreadAdmin; + JobQueue *m_pQueue; + ::rtl::ByteSequence m_aThreadId; + sal_Bool m_bAsynchron; + sal_Bool m_bDeleteSelf; + }; + + class ThreadAdmin + { + public: + ~ThreadAdmin (); + static ThreadAdminHolder &getInstance(); + void add( ORequestThread * ); + void remove( ORequestThread * ); + void join(); + + private: + ::osl::Mutex m_mutex; + ::std::list< ORequestThread * > m_lst; + }; + +} // end cppu_threadpool + + +#endif + diff --git a/cppu/source/threadpool/threadident.cxx b/cppu/source/threadpool/threadident.cxx new file mode 100644 index 000000000000..0fb6c1196185 --- /dev/null +++ b/cppu/source/threadpool/threadident.cxx @@ -0,0 +1,135 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ + +// MARKER(update_precomp.py): autogen include statement, do not remove +#include "precompiled_cppu.hxx" +#include <stdio.h> + +#include <list> + +#include <osl/mutex.hxx> +#include <osl/thread.h> +#include <osl/diagnose.h> + +#include <rtl/process.h> +#include <rtl/byteseq.hxx> + +#include <uno/threadpool.h> + +#include "current.hxx" + + +using namespace ::std; +using namespace ::osl; +using namespace ::rtl; +using namespace ::cppu; + + +static inline void createLocalId( sal_Sequence **ppThreadId ) +{ + rtl_byte_sequence_constructNoDefault( ppThreadId , 4 + 16 ); + *((sal_Int32*) ((*ppThreadId)->elements)) = osl_getThreadIdentifier(0); + + rtl_getGlobalProcessId( (sal_uInt8 * ) &( (*ppThreadId)->elements[4]) ); +} + + +extern "C" void SAL_CALL +uno_getIdOfCurrentThread( sal_Sequence **ppThreadId ) + SAL_THROW_EXTERN_C() +{ + IdContainer * p = getIdContainer(); + if( ! p->bInit ) + { + // first time, that the thread enters the bridge + createLocalId( ppThreadId ); + + // TODO + // note : this is a leak ! + p->pLocalThreadId = *ppThreadId; + p->pCurrentId = *ppThreadId; + p->nRefCountOfCurrentId = 1; + rtl_byte_sequence_acquire( p->pLocalThreadId ); + rtl_byte_sequence_acquire( p->pCurrentId ); + p->bInit = sal_True; + } + else + { + p->nRefCountOfCurrentId ++; + if( *ppThreadId ) + { + rtl_byte_sequence_release( *ppThreadId ); + } + *ppThreadId = p->pCurrentId; + rtl_byte_sequence_acquire( *ppThreadId ); + } +} + + +extern "C" void SAL_CALL uno_releaseIdFromCurrentThread() + SAL_THROW_EXTERN_C() +{ + IdContainer *p = getIdContainer(); + OSL_ASSERT( p ); + OSL_ASSERT( p->nRefCountOfCurrentId ); + + p->nRefCountOfCurrentId --; + if( ! p->nRefCountOfCurrentId && (p->pLocalThreadId != p->pCurrentId) ) + { + rtl_byte_sequence_assign( &(p->pCurrentId) , p->pLocalThreadId ); + } +} + +extern "C" sal_Bool SAL_CALL uno_bindIdToCurrentThread( sal_Sequence *pThreadId ) + SAL_THROW_EXTERN_C() +{ + IdContainer *p = getIdContainer(); + if( ! p->bInit ) + { + p->pLocalThreadId = 0; + createLocalId( &(p->pLocalThreadId) ); + p->nRefCountOfCurrentId = 1; + p->pCurrentId = pThreadId; + rtl_byte_sequence_acquire( p->pCurrentId ); + p->bInit = sal_True; + } + else + { + OSL_ASSERT( 0 == p->nRefCountOfCurrentId ); + if( 0 == p->nRefCountOfCurrentId ) + { + rtl_byte_sequence_assign(&( p->pCurrentId ), pThreadId ); + p->nRefCountOfCurrentId ++; + } + else + { + return sal_False; + } + + } + return sal_True; +} diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx new file mode 100644 index 000000000000..b7df159a80b9 --- /dev/null +++ b/cppu/source/threadpool/threadpool.cxx @@ -0,0 +1,506 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ + +// MARKER(update_precomp.py): autogen include statement, do not remove +#include "precompiled_cppu.hxx" +#include <hash_set> +#include <stdio.h> + +#include <osl/diagnose.h> +#include <osl/mutex.hxx> +#include <osl/thread.h> +#include <rtl/instance.hxx> + +#include <uno/threadpool.h> + +#include "threadpool.hxx" +#include "thread.hxx" + +using namespace ::std; +using namespace ::osl; + +namespace cppu_threadpool +{ + struct theDisposedCallerAdmin : + public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin > + { + DisposedCallerAdminHolder operator () () { + return DisposedCallerAdminHolder(new DisposedCallerAdmin()); + } + }; + + DisposedCallerAdminHolder DisposedCallerAdmin::getInstance() + { + return theDisposedCallerAdmin::get(); + } + + DisposedCallerAdmin::~DisposedCallerAdmin() + { +#if OSL_DEBUG_LEVEL > 1 + if( !m_lst.empty() ) + { + printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( ))); + } +#endif + } + + void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + m_lst.push_back( nDisposeId ); + } + + void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + for( DisposedCallerList::iterator ii = m_lst.begin() ; + ii != m_lst.end() ; + ++ ii ) + { + if( (*ii) == nDisposeId ) + { + m_lst.erase( ii ); + break; + } + } + } + + sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId ) + { + MutexGuard guard( m_mutex ); + for( DisposedCallerList::iterator ii = m_lst.begin() ; + ii != m_lst.end() ; + ++ ii ) + { + if( (*ii) == nDisposeId ) + { + return sal_True; + } + } + return sal_False; + } + + + //------------------------------------------------------------------------------- + + struct theThreadPool : + public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool > + { + ThreadPoolHolder operator () () { + ThreadPoolHolder aRet(new ThreadPool()); + return aRet; + } + }; + + ThreadPool::ThreadPool() + { + m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance(); + } + + ThreadPool::~ThreadPool() + { +#if OSL_DEBUG_LEVEL > 1 + if( m_mapQueue.size() ) + { + printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) ); + } +#endif + } + ThreadPoolHolder ThreadPool::getInstance() + { + return theThreadPool::get(); + } + + + void ThreadPool::dispose( sal_Int64 nDisposeId ) + { + if( nDisposeId ) + { + m_DisposedCallerAdmin->dispose( nDisposeId ); + + MutexGuard guard( m_mutex ); + for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; + ii != m_mapQueue.end(); + ++ii) + { + if( (*ii).second.first ) + { + (*ii).second.first->dispose( nDisposeId ); + } + if( (*ii).second.second ) + { + (*ii).second.second->dispose( nDisposeId ); + } + } + } + else + { + { + MutexGuard guard( m_mutexWaitingThreadList ); + for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; + ii != m_lstThreads.end() ; + ++ ii ) + { + // wake the threads up + osl_setCondition( (*ii)->condition ); + } + } + ThreadAdmin::getInstance()->join(); + } + } + + void ThreadPool::stopDisposing( sal_Int64 nDisposeId ) + { + m_DisposedCallerAdmin->stopDisposing( nDisposeId ); + } + + /****************** + * This methods lets the thread wait a certain amount of time. If within this timespan + * a new request comes in, this thread is reused. This is done only to improve performance, + * it is not required for threadpool functionality. + ******************/ + void ThreadPool::waitInPool( ORequestThread * pThread ) + { + struct WaitingThread waitingThread; + waitingThread.condition = osl_createCondition(); + waitingThread.thread = pThread; + { + MutexGuard guard( m_mutexWaitingThreadList ); + m_lstThreads.push_front( &waitingThread ); + } + + // let the thread wait 2 seconds + TimeValue time = { 2 , 0 }; + osl_waitCondition( waitingThread.condition , &time ); + + { + MutexGuard guard ( m_mutexWaitingThreadList ); + if( waitingThread.thread ) + { + // thread wasn't reused, remove it from the list + WaitingThreadList::iterator ii = find( + m_lstThreads.begin(), m_lstThreads.end(), &waitingThread ); + OSL_ASSERT( ii != m_lstThreads.end() ); + m_lstThreads.erase( ii ); + } + } + + osl_destroyCondition( waitingThread.condition ); + } + + void ThreadPool::createThread( JobQueue *pQueue , + const ByteSequence &aThreadId, + sal_Bool bAsynchron ) + { + sal_Bool bCreate = sal_True; + { + // Can a thread be reused ? + MutexGuard guard( m_mutexWaitingThreadList ); + if( ! m_lstThreads.empty() ) + { + // inform the thread and let it go + struct WaitingThread *pWaitingThread = m_lstThreads.back(); + pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron ); + pWaitingThread->thread = 0; + + // remove from list + m_lstThreads.pop_back(); + + // let the thread go + osl_setCondition( pWaitingThread->condition ); + bCreate = sal_False; + } + } + + if( bCreate ) + { + ORequestThread *pThread = + new ORequestThread( pQueue , aThreadId, bAsynchron); + // deletes itself ! + pThread->create(); + } + } + + sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron ) + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + OSL_ASSERT( ii != m_mapQueue.end() ); + + if( bAsynchron ) + { + if( ! (*ii).second.second->isEmpty() ) + { + // another thread has put something into the queue + return sal_False; + } + + (*ii).second.second = 0; + if( (*ii).second.first ) + { + // all oneway request have been processed, now + // synchronus requests may go on + (*ii).second.first->resume(); + } + } + else + { + if( ! (*ii).second.first->isEmpty() ) + { + // another thread has put something into the queue + return sal_False; + } + (*ii).second.first = 0; + } + + if( 0 == (*ii).second.first && 0 == (*ii).second.second ) + { + m_mapQueue.erase( ii ); + } + + return sal_True; + } + + + void ThreadPool::addJob( + const ByteSequence &aThreadId , + sal_Bool bAsynchron, + void *pThreadSpecificData, + RequestFun * doRequest ) + { + sal_Bool bCreateThread = sal_False; + JobQueue *pQueue = 0; + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + if( ii == m_mapQueue.end() ) + { + m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 ); + ii = m_mapQueue.find( aThreadId ); + OSL_ASSERT( ii != m_mapQueue.end() ); + } + + if( bAsynchron ) + { + if( ! (*ii).second.second ) + { + (*ii).second.second = new JobQueue(); + bCreateThread = sal_True; + } + pQueue = (*ii).second.second; + } + else + { + if( ! (*ii).second.first ) + { + (*ii).second.first = new JobQueue(); + bCreateThread = sal_True; + } + pQueue = (*ii).second.first; + + if( (*ii).second.second && ( (*ii).second.second->isBusy() ) ) + { + pQueue->suspend(); + } + } + pQueue->add( pThreadSpecificData , doRequest ); + } + + if( bCreateThread ) + { + createThread( pQueue , aThreadId , bAsynchron); + } + } + + void ThreadPool::prepare( const ByteSequence &aThreadId ) + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + if( ii == m_mapQueue.end() ) + { + JobQueue *p = new JobQueue(); + m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 ); + } + else if( 0 == (*ii).second.first ) + { + (*ii).second.first = new JobQueue(); + } + } + + void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId ) + { + JobQueue *pQueue = 0; + { + MutexGuard guard( m_mutex ); + + ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); + + OSL_ASSERT( ii != m_mapQueue.end() ); + pQueue = (*ii).second.first; + } + + OSL_ASSERT( pQueue ); + void *pReturn = pQueue->enter( nDisposeId ); + + if( pQueue->isCallstackEmpty() ) + { + if( revokeQueue( aThreadId , sal_False) ) + { + // remove queue + delete pQueue; + } + } + return pReturn; + } +} + + +using namespace cppu_threadpool; + +struct uno_ThreadPool_Equal +{ + sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const + { + return a == b; + } +}; + +struct uno_ThreadPool_Hash +{ + sal_Size operator () ( const uno_ThreadPool &a ) const + { + return (sal_Size) a; + } +}; + + + +typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet; + +static ThreadpoolHashSet *g_pThreadpoolHashSet; + +struct _uno_ThreadPool +{ + sal_Int32 dummy; +}; + +extern "C" uno_ThreadPool SAL_CALL +uno_threadpool_create() SAL_THROW_EXTERN_C() +{ + MutexGuard guard( Mutex::getGlobalMutex() ); + if( ! g_pThreadpoolHashSet ) + { + g_pThreadpoolHashSet = new ThreadpoolHashSet(); + } + + // Just ensure that the handle is unique in the process (via heap) + uno_ThreadPool h = new struct _uno_ThreadPool; + g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) ); + return h; +} + +extern "C" void SAL_CALL +uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C() +{ + sal_Sequence *pThreadId = 0; + uno_getIdOfCurrentThread( &pThreadId ); + ThreadPool::getInstance()->prepare( pThreadId ); + rtl_byte_sequence_release( pThreadId ); + uno_releaseIdFromCurrentThread(); +} + +extern "C" void SAL_CALL +uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) + SAL_THROW_EXTERN_C() +{ + sal_Sequence *pThreadId = 0; + uno_getIdOfCurrentThread( &pThreadId ); + *ppJob = + ThreadPool::getInstance()->enter( + pThreadId, + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(hPool)) ); + rtl_byte_sequence_release( pThreadId ); + uno_releaseIdFromCurrentThread(); +} + +extern "C" void SAL_CALL +uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C() +{ + // we might do here some tiding up in case a thread called attach but never detach +} + +extern "C" void SAL_CALL +uno_threadpool_putJob( + uno_ThreadPool, + sal_Sequence *pThreadId, + void *pJob, + void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), + sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() +{ + ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest ); +} + +extern "C" void SAL_CALL +uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + ThreadPool::getInstance()->dispose( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(hPool)) ); +} + +extern "C" void SAL_CALL +uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() +{ + ThreadPool::getInstance()->stopDisposing( + sal::static_int_cast< sal_Int64 >( + reinterpret_cast< sal_IntPtr >(hPool)) ); + + if( hPool ) + { + // special treatment for 0 ! + OSL_ASSERT( g_pThreadpoolHashSet ); + + MutexGuard guard( Mutex::getGlobalMutex() ); + + ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool ); + OSL_ASSERT( ii != g_pThreadpoolHashSet->end() ); + g_pThreadpoolHashSet->erase( ii ); + delete hPool; + + if( g_pThreadpoolHashSet->empty() ) + { + delete g_pThreadpoolHashSet; + g_pThreadpoolHashSet = 0; + } + } +} diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx new file mode 100644 index 000000000000..e1c9a127fa55 --- /dev/null +++ b/cppu/source/threadpool/threadpool.hxx @@ -0,0 +1,140 @@ +/************************************************************************* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * Copyright 2000, 2010 Oracle and/or its affiliates. + * + * OpenOffice.org - a multi-platform office productivity suite + * + * This file is part of OpenOffice.org. + * + * OpenOffice.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License version 3 + * only, as published by the Free Software Foundation. + * + * OpenOffice.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License version 3 for more details + * (a copy is included in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU Lesser General Public License + * version 3 along with OpenOffice.org. If not, see + * <http://www.openoffice.org/license.html> + * for a copy of the LGPLv3 License. + * + ************************************************************************/ +#include <hash_map> + +#include <osl/conditn.h> + +#include <rtl/byteseq.hxx> + +#include <boost/shared_ptr.hpp> + +#include "jobqueue.hxx" + + +using namespace ::rtl; +namespace cppu_threadpool { + class ORequestThread; + + struct EqualThreadId + { + sal_Int32 operator () ( const ::rtl::ByteSequence &a , const ::rtl::ByteSequence &b ) const + { + return a == b; + } + }; + + struct HashThreadId + { + sal_Int32 operator () ( const ::rtl::ByteSequence &a ) const + { + if( a.getLength() >= 4 ) + { + return *(sal_Int32 *)a.getConstArray(); + } + return 0; + } + }; + + typedef ::std::hash_map + < + ByteSequence, // ThreadID + ::std::pair < JobQueue * , JobQueue * >, + HashThreadId, + EqualThreadId + > ThreadIdHashMap; + + typedef ::std::list < sal_Int64 > DisposedCallerList; + + + struct WaitingThread + { + oslCondition condition; + ORequestThread *thread; + }; + + typedef ::std::list < struct ::cppu_threadpool::WaitingThread * > WaitingThreadList; + + class DisposedCallerAdmin; + typedef boost::shared_ptr<DisposedCallerAdmin> DisposedCallerAdminHolder; + + class DisposedCallerAdmin + { + public: + ~DisposedCallerAdmin(); + + static DisposedCallerAdminHolder getInstance(); + + void dispose( sal_Int64 nDisposeId ); + void stopDisposing( sal_Int64 nDisposeId ); + sal_Bool isDisposed( sal_Int64 nDisposeId ); + + private: + ::osl::Mutex m_mutex; + DisposedCallerList m_lst; + }; + + class ThreadPool; + typedef boost::shared_ptr<ThreadPool> ThreadPoolHolder; + + class ThreadPool + { + public: + ThreadPool(); + ~ThreadPool(); + static ThreadPoolHolder getInstance(); + + void dispose( sal_Int64 nDisposeId ); + void stopDisposing( sal_Int64 nDisposeId ); + + void addJob( const ByteSequence &aThreadId, + sal_Bool bAsynchron, + void *pThreadSpecificData, + RequestFun * doRequest ); + + void prepare( const ByteSequence &aThreadId ); + void * enter( const ByteSequence &aThreadId, sal_Int64 nDisposeId ); + + /******** + * @return true, if queue could be succesfully revoked. + ********/ + sal_Bool revokeQueue( const ByteSequence & aThreadId , sal_Bool bAsynchron ); + + void waitInPool( ORequestThread *pThread ); + private: + void createThread( JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron); + + + ThreadIdHashMap m_mapQueue; + ::osl::Mutex m_mutex; + + ::osl::Mutex m_mutexWaitingThreadList; + WaitingThreadList m_lstThreads; + + DisposedCallerAdminHolder m_DisposedCallerAdmin; + }; + +} // end namespace cppu_threadpool |