summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephan Bergmann <sbergman@redhat.com>2012-05-16 22:09:21 +0200
committerStephan Bergmann <sbergman@redhat.com>2012-05-16 22:09:21 +0200
commitd015384e1d98fe77fd59339044f58efb1ab9fb25 (patch)
tree27972d297a53f302b148f542d11983b54328496c
parent9aad89df8757d4570084424e63b9562a13448bde (diff)
Fixed ThreadPool (and dependent ORequestThread) life cycle
At least with sw_complex test under load, it happened that an ORequestThread could still process a remote release request while the main thread was already in exit(3). This was because (a) ThreadPool never joined with the spawned worker threads (which has been rectified by calling uno_threadpool_dispose(0) from the final uno_threadpool_destroy), and (b) binaryurp::Bridge called uno_threadpool_destroy only from its destructor (which could go as late as exit(3)) instead of from terminate. Additional clean up: * Access to Bridge's threadPool_ is now cleanly controlled by mutex_ (even though that might not be necessary in every case). * ThreadPool's stopDisposing got renamed to destroy, to make meaning clearer. Change-Id: I45fa76e80e790a11065e7bf8ac9d92af2e62f262
-rw-r--r--binaryurp/source/bridge.cxx52
-rw-r--r--binaryurp/source/bridge.hxx4
-rw-r--r--cppu/source/threadpool/threadpool.cxx24
-rw-r--r--cppu/source/threadpool/threadpool.hxx4
4 files changed, 52 insertions, 32 deletions
diff --git a/binaryurp/source/bridge.cxx b/binaryurp/source/bridge.cxx
index 50b873f6a70f..2d1f622369a3 100644
--- a/binaryurp/source/bridge.cxx
+++ b/binaryurp/source/bridge.cxx
@@ -232,36 +232,43 @@ Bridge::Bridge(
css::uno::Reference< css::uno::XInterface >());
}
passive_.set();
}
void Bridge::start() {
- assert(threadPool_ == 0 && !writer_.is() && !reader_.is());
- threadPool_ = uno_threadpool_create();
- assert(threadPool_ != 0);
- writer_.set(new Writer(this));
- writer_->launch();
- reader_.set(new Reader(this));
- reader_->launch();
- // it is important to call reader_->launch() last here; both
- // Writer::execute and Reader::execute can call Bridge::terminate, but
- // Writer::execute is initially blocked in unblocked_.wait() until
- // Reader::execute has called bridge_->sendRequestChangeRequest(), so
- // effectively only reader_->launch() can lead to an early call to
- // Bridge::terminate
+ rtl::Reference< Reader > r(new Reader(this));
+ rtl::Reference< Writer > w(new Writer(this));
+ {
+ osl::MutexGuard g(mutex_);
+ assert(threadPool_ == 0 && !writer_.is() && !reader_.is());
+ threadPool_ = uno_threadpool_create();
+ assert(threadPool_ != 0);
+ reader_ = r;
+ writer_ = w;
+ }
+ // It is important to call reader_->launch() last here; both
+ // Writer::execute and Reader::execute can call Bridge::terminate, but
+ // Writer::execute is initially blocked in unblocked_.wait() until
+ // Reader::execute has called bridge_->sendRequestChangeRequest(), so
+ // effectively only reader_->launch() can lead to an early call to
+ // Bridge::terminate
+ w->launch();
+ r->launch();
}
void Bridge::terminate() {
+ uno_ThreadPool tp;
rtl::Reference< Reader > r;
rtl::Reference< Writer > w;
Listeners ls;
{
osl::MutexGuard g(mutex_);
if (terminated_) {
return;
}
+ tp = threadPool_;
std::swap(reader_, r);
std::swap(writer_, w);
ls.swap(listeners_);
terminated_ = true;
}
try {
@@ -270,14 +277,14 @@ void Bridge::terminate() {
SAL_INFO("binaryurp", "caught IO exception '" << e.Message << '\'');
}
assert(w.is());
w->stop();
joinThread(r.get());
joinThread(w.get());
- assert(threadPool_ != 0);
- uno_threadpool_dispose(threadPool_);
+ assert(tp != 0);
+ uno_threadpool_dispose(tp);
Stubs s;
{
osl::MutexGuard g(mutex_);
s.swap(stubs_);
}
for (Stubs::iterator i(s.begin()); i != s.end(); ++i) {
@@ -298,12 +305,13 @@ void Bridge::terminate() {
static_cast< cppu::OWeakObject * >(this)));
} catch (const css::uno::RuntimeException & e) {
SAL_WARN(
"binaryurp", "caught runtime exception '" << e.Message << '\'');
}
}
+ uno_threadpool_destroy(tp);
}
css::uno::Reference< css::connection::XConnection > Bridge::getConnection()
const
{
return connection_;
@@ -327,13 +335,14 @@ BinaryAny Bridge::mapCppToBinaryAny(css::uno::Any const & cppAny) {
out.get(), &in,
css::uno::TypeDescription(cppu::UnoType< css::uno::Any >::get()).get(),
cppToBinaryMapping_.get());
return out;
}
-uno_ThreadPool Bridge::getThreadPool() const {
+uno_ThreadPool Bridge::getThreadPool() {
+ osl::MutexGuard g(mutex_);
assert(threadPool_ != 0);
return threadPool_;
}
rtl::Reference< Writer > Bridge::getWriter() {
osl::MutexGuard g(mutex_);
@@ -568,24 +577,25 @@ bool Bridge::makeCall(
rtl::OUString const & oid, css::uno::TypeDescription const & member,
bool setter, std::vector< BinaryAny > const & inArguments,
BinaryAny * returnValue, std::vector< BinaryAny > * outArguments)
{
std::auto_ptr< IncomingReply > resp;
{
- AttachThread att(threadPool_);
+ uno_ThreadPool tp = getThreadPool();
+ AttachThread att(tp);
PopOutgoingRequest pop(
outgoingRequests_, att.getTid(),
OutgoingRequest(OutgoingRequest::KIND_NORMAL, member, setter));
sendRequest(
att.getTid(), oid, css::uno::TypeDescription(), member,
inArguments);
pop.clear();
incrementCalls(true);
incrementActiveCalls();
void * job;
- uno_threadpool_enter(threadPool_, &job);
+ uno_threadpool_enter(tp, &job);
resp.reset(static_cast< IncomingReply * >(job));
decrementActiveCalls();
decrementCalls();
}
if (resp.get() == 0) {
throw css::lang::DisposedException(
@@ -809,14 +819,14 @@ void Bridge::setCurrentContextMode() {
bool Bridge::isCurrentContextMode() {
osl::MutexGuard g(mutex_);
return currentContextMode_;
}
Bridge::~Bridge() {
- if (threadPool_ != 0) {
- uno_threadpool_destroy(threadPool_);
+ if (getThreadPool() != 0) {
+ terminate();
}
}
css::uno::Reference< css::uno::XInterface > Bridge::getInstance(
rtl::OUString const & sInstanceName) throw (css::uno::RuntimeException)
{
@@ -937,13 +947,13 @@ void Bridge::sendProtPropRequest(
pop.clear();
}
void Bridge::makeReleaseCall(
rtl::OUString const & oid, css::uno::TypeDescription const & type)
{
- AttachThread att(threadPool_);
+ AttachThread att(getThreadPool());
sendRequest(
att.getTid(), oid, type,
css::uno::TypeDescription(
rtl::OUString(
RTL_CONSTASCII_USTRINGPARAM(
"com.sun.star.uno.XInterface::release"))),
diff --git a/binaryurp/source/bridge.hxx b/binaryurp/source/bridge.hxx
index cf281f2febfd..8d667897d253 100644
--- a/binaryurp/source/bridge.hxx
+++ b/binaryurp/source/bridge.hxx
@@ -103,13 +103,13 @@ public:
getProvider() const;
com::sun::star::uno::Mapping & getCppToBinaryMapping();
BinaryAny mapCppToBinaryAny(com::sun::star::uno::Any const & cppAny);
- uno_ThreadPool getThreadPool() const;
+ uno_ThreadPool getThreadPool();
rtl::Reference< Writer > getWriter();
com::sun::star::uno::UnoInterfaceReference registerIncomingInterface(
rtl::OUString const & oid,
com::sun::star::uno::TypeDescription const & type);
@@ -255,17 +255,17 @@ private:
com::sun::star::uno::Mapping binaryToCppMapping_;
rtl::ByteSequence protPropTid_;
rtl::OUString protPropOid_;
com::sun::star::uno::TypeDescription protPropType_;
com::sun::star::uno::TypeDescription protPropRequest_;
com::sun::star::uno::TypeDescription protPropCommit_;
- uno_ThreadPool threadPool_;
OutgoingRequests outgoingRequests_;
osl::Mutex mutex_;
Listeners listeners_;
+ uno_ThreadPool threadPool_;
rtl::Reference< Writer > writer_;
rtl::Reference< Reader > reader_;
bool currentContextMode_;
Stubs stubs_;
std::size_t proxies_;
std::size_t calls_;
diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx
index 9dda867a0878..d14e26006e04 100644
--- a/cppu/source/threadpool/threadpool.cxx
+++ b/cppu/source/threadpool/threadpool.cxx
@@ -23,13 +23,16 @@
* version 3 along with OpenOffice.org. If not, see
* <http://www.openoffice.org/license.html>
* for a copy of the LGPLv3 License.
*
************************************************************************/
+#include "sal/config.h"
+
#include <boost/unordered_map.hpp>
+#include <cassert>
#include <stdio.h>
#include <osl/diagnose.h>
#include <osl/mutex.hxx>
#include <osl/thread.h>
#include <rtl/instance.hxx>
@@ -70,13 +73,13 @@ namespace cppu_threadpool
void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
{
MutexGuard guard( m_mutex );
m_lst.push_back( nDisposeId );
}
- void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId )
+ void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
{
MutexGuard guard( m_mutex );
for( DisposedCallerList::iterator ii = m_lst.begin() ;
ii != m_lst.end() ;
++ ii )
{
@@ -169,15 +172,15 @@ namespace cppu_threadpool
}
}
ThreadAdmin::getInstance()->join();
}
}
- void ThreadPool::stopDisposing( sal_Int64 nDisposeId )
+ void ThreadPool::destroy( sal_Int64 nDisposeId )
{
- m_DisposedCallerAdmin->stopDisposing( nDisposeId );
+ m_DisposedCallerAdmin->destroy( nDisposeId );
}
/******************
* This methods lets the thread wait a certain amount of time. If within this timespan
* a new request comes in, this thread is reused. This is done only to improve performance,
* it is not required for threadpool functionality.
@@ -477,31 +480,38 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
reinterpret_cast< sal_IntPtr >(hPool)) );
}
extern "C" void SAL_CALL
uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
{
- ThreadPool::getInstance()->stopDisposing(
+ assert(hPool != 0);
+
+ ThreadPool::getInstance()->destroy(
sal::static_int_cast< sal_Int64 >(
reinterpret_cast< sal_IntPtr >(hPool)) );
- if( hPool )
+ bool empty;
{
- // special treatment for 0 !
OSL_ASSERT( g_pThreadpoolHashSet );
MutexGuard guard( Mutex::getGlobalMutex() );
ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
g_pThreadpoolHashSet->erase( ii );
delete hPool;
- if( g_pThreadpoolHashSet->empty() )
+ empty = g_pThreadpoolHashSet->empty();
+ if( empty )
{
delete g_pThreadpoolHashSet;
g_pThreadpoolHashSet = 0;
}
}
+
+ if( empty )
+ {
+ uno_threadpool_dispose( 0 );
+ }
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx
index 498ea4a02cd3..8b64ed18d682 100644
--- a/cppu/source/threadpool/threadpool.hxx
+++ b/cppu/source/threadpool/threadpool.hxx
@@ -87,13 +87,13 @@ namespace cppu_threadpool {
public:
~DisposedCallerAdmin();
static DisposedCallerAdminHolder getInstance();
void dispose( sal_Int64 nDisposeId );
- void stopDisposing( sal_Int64 nDisposeId );
+ void destroy( sal_Int64 nDisposeId );
sal_Bool isDisposed( sal_Int64 nDisposeId );
private:
::osl::Mutex m_mutex;
DisposedCallerList m_lst;
};
@@ -106,13 +106,13 @@ namespace cppu_threadpool {
public:
ThreadPool();
~ThreadPool();
static ThreadPoolHolder getInstance();
void dispose( sal_Int64 nDisposeId );
- void stopDisposing( sal_Int64 nDisposeId );
+ void destroy( sal_Int64 nDisposeId );
void addJob( const ByteSequence &aThreadId,
sal_Bool bAsynchron,
void *pThreadSpecificData,
RequestFun * doRequest );