summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephan Bergmann <sbergman@redhat.com>2012-05-23 09:42:37 +0200
committerCaolán McNamara <caolanm@redhat.com>2012-05-24 07:52:29 +0100
commitb7721ccca2f76c52dd6c6bb9b8f673fb276fea52 (patch)
tree2d25739bc6bd4c46bdf2c0fb48b2548255772c52
parentde4b790fa928f92bf40b2e2b2c2acb622f9ebd8b (diff)
Better fix for ThreadPool/ORequestThread life cycle
This is a follow up to d015384e1d98fe77fd59339044f58efb1ab9fb25 "Fixed ThreadPool (and dependent ORequestThread) life cycle" that still had some problems: * First, if Bridge::terminate was first entered from the reader or writer thread, it would not join on that thread, so that thread could still be running during exit. That has been addressed by giving Bridge::dispose new semantics: It waits until both Bridge::terminate has completed (even if that was called from a different thread) and all spawned threads (reader, writer, ORequestThread workers) have been joined. (This implies that Bridge::dispose must not be called from such a thread, to avoid deadlock.) * Second, if Bridge::terminate was first entered from an ORequestThread, the call to uno_threadpool_dispose(0) to join on all such worker threads could deadlock. That has been addressed by making the last call to uno_threadpool_destroy wait to join on all worker threads, and by calling uno_threadpool_destroy only from the final Bridge::terminate (from Bridge::dispose), to avoid deadlock. (The special semantics of uno_threadpool_dispose(0) are no longer needed and have been removed, as they conflicted with the fix for the third problem below.) * Third, once uno_threadpool_destroy had called uno_threadpool_dispose(0), the ThreadAdmin singleton had been disposed, so no new remote bridges could successfully be created afterwards. That has been addressed by making ThreadAdmin a member of ThreadPool, and making (only) those uno_ThreadPool handles with overlapping life spans share one ThreadPool instance (which thus is no longer a singleton, either). Additionally, ORequestThread has been made more robust (in the style of salhelper::Thread) to avoid races. (cherry picked from commit 2fa2660b55a34a5780f9ea8dbbbe92d05dc9a818) Conflicts: binaryurp/source/bridge.cxx cppu/Library_cppu.mk cppu/source/threadpool/threadpool.cxx Change-Id: I2cbd1b3f9aecc1bf4649e482d2c22b33b471788f Signed-off-by: Caolán McNamara <caolanm@redhat.com>
-rw-r--r--binaryurp/source/bridge.cxx205
-rw-r--r--binaryurp/source/bridge.hxx25
-rw-r--r--binaryurp/source/bridgefactory.cxx2
-rw-r--r--binaryurp/source/incomingrequest.cxx2
-rw-r--r--binaryurp/source/reader.cxx2
-rw-r--r--binaryurp/source/writer.cxx2
-rw-r--r--cppu/inc/uno/threadpool.h8
-rw-r--r--cppu/source/threadpool/thread.cxx184
-rw-r--r--cppu/source/threadpool/thread.hxx52
-rw-r--r--cppu/source/threadpool/threadpool.cxx124
-rw-r--r--cppu/source/threadpool/threadpool.hxx43
-rw-r--r--cppu/util/makefile.mk2
-rw-r--r--cpputools/source/unoexe/unoexe.cxx4
13 files changed, 372 insertions, 283 deletions
diff --git a/binaryurp/source/bridge.cxx b/binaryurp/source/bridge.cxx
index f591fe0e9e43..13987fe3bd51 100644
--- a/binaryurp/source/bridge.cxx
+++ b/binaryurp/source/bridge.cxx
@@ -105,11 +105,9 @@ extern "C" void SAL_CALL freeProxyCallback(uno_ExtEnvironment *, void * pProxy)
105 static_cast< Proxy * >(pProxy)->do_free(); 105 static_cast< Proxy * >(pProxy)->do_free();
106} 106}
107 107
108void joinThread(osl::Thread * thread) { 108bool isThread(osl::Thread * thread) {
109 assert(thread != 0); 109 assert(thread != 0);
110 if (thread->getIdentifier() != osl::Thread::getCurrentIdentifier()) { 110 return osl::Thread::getCurrentIdentifier() == thread->getIdentifier();
111 thread->join();
112 }
113} 111}
114 112
115class AttachThread: private boost::noncopyable { 113class AttachThread: private boost::noncopyable {
@@ -213,8 +211,8 @@ Bridge::Bridge(
213 rtl::OUString( 211 rtl::OUString(
214 RTL_CONSTASCII_USTRINGPARAM( 212 RTL_CONSTASCII_USTRINGPARAM(
215 "com.sun.star.bridge.XProtocolProperties::commitChange"))), 213 "com.sun.star.bridge.XProtocolProperties::commitChange"))),
216 threadPool_(0), currentContextMode_(false), proxies_(0), calls_(0), 214 state_(STATE_INITIAL), threadPool_(0), currentContextMode_(false),
217 normalCall_(false), activeCalls_(0), terminated_(false), 215 proxies_(0), calls_(0), normalCall_(false), activeCalls_(0),
218 mode_(MODE_REQUESTED) 216 mode_(MODE_REQUESTED)
219{ 217{
220 assert(factory.is() && connection.is()); 218 assert(factory.is() && connection.is());
@@ -238,11 +236,14 @@ void Bridge::start() {
238 rtl::Reference< Writer > w(new Writer(this)); 236 rtl::Reference< Writer > w(new Writer(this));
239 { 237 {
240 osl::MutexGuard g(mutex_); 238 osl::MutexGuard g(mutex_);
241 assert(threadPool_ == 0 && !writer_.is() && !reader_.is()); 239 assert(
240 state_ == STATE_INITIAL && threadPool_ == 0 && !writer_.is() &&
241 !reader_.is());
242 threadPool_ = uno_threadpool_create(); 242 threadPool_ = uno_threadpool_create();
243 assert(threadPool_ != 0); 243 assert(threadPool_ != 0);
244 reader_ = r; 244 reader_ = r;
245 writer_ = w; 245 writer_ = w;
246 state_ = STATE_STARTED;
246 } 247 }
247 // It is important to call reader_->create() last here; both 248 // It is important to call reader_->create() last here; both
248 // Writer::execute and Reader::execute can call Bridge::terminate, but 249 // Writer::execute and Reader::execute can call Bridge::terminate, but
@@ -254,60 +255,117 @@ void Bridge::start() {
254 r->create(); 255 r->create();
255} 256}
256 257
257void Bridge::terminate() { 258void Bridge::terminate(bool final) {
258 uno_ThreadPool tp; 259 uno_ThreadPool tp;
259 rtl::Reference< Reader > r; 260 // Make sure function-local variables (Stubs s, etc.) are destroyed before
260 rtl::Reference< Writer > w; 261 // the final uno_threadpool_destroy/threadPool_ = 0:
261 Listeners ls;
262 { 262 {
263 osl::MutexGuard g(mutex_); 263 rtl::Reference< Reader > r;
264 if (terminated_) { 264 rtl::Reference< Writer > w;
265 return; 265 bool joinW;
266 Listeners ls;
267 {
268 osl::ClearableMutexGuard g(mutex_);
269 switch (state_) {
270 case STATE_INITIAL: // via ~Bridge -> dispose -> terminate
271 case STATE_FINAL:
272 return;
273 case STATE_STARTED:
274 break;
275 case STATE_TERMINATED:
276 if (final) {
277 g.clear();
278 terminated_.wait();
279 {
280 osl::MutexGuard g2(mutex_);
281 tp = threadPool_;
282 threadPool_ = 0;
283 assert(!(reader_.is() && isThread(reader_.get())));
284 std::swap(reader_, r);
285 assert(!(writer_.is() && isThread(writer_.get())));
286 std::swap(writer_, w);
287 state_ = STATE_FINAL;
288 }
289 assert(!(r.is() && w.is()));
290 if (r.is()) {
291 r->join();
292 } else if (w.is()) {
293 w->join();
294 }
295 if (tp != 0) {
296 uno_threadpool_destroy(tp);
297 }
298 }
299 return;
300 }
301 tp = threadPool_;
302 assert(!(final && isThread(reader_.get())));
303 if (!isThread(reader_.get())) {
304 std::swap(reader_, r);
305 }
306 w = writer_;
307 joinW = !isThread(writer_.get());
308 assert(!final || joinW);
309 if (joinW) {
310 writer_.clear();
311 }
312 ls.swap(listeners_);
313 state_ = final ? STATE_FINAL : STATE_TERMINATED;
314 }
315 try {
316 connection_->close();
317 } catch (const css::io::IOException & e) {
318 SAL_INFO("binaryurp", "caught IO exception '" << e.Message << '\'');
319 }
320 assert(w.is());
321 w->stop();
322 if (r.is()) {
323 r->join();
324 }
325 if (joinW) {
326 w->join();
327 }
328 assert(tp != 0);
329 uno_threadpool_dispose(tp);
330 Stubs s;
331 {
332 osl::MutexGuard g(mutex_);
333 s.swap(stubs_);
334 }
335 for (Stubs::iterator i(s.begin()); i != s.end(); ++i) {
336 for (Stub::iterator j(i->second.begin()); j != i->second.end(); ++j)
337 {
338 SAL_INFO(
339 "binaryurp",
340 "stub '" << i->first << "', '" << toString(j->first)
341 << "' still mapped at Bridge::terminate");
342 binaryUno_.get()->pExtEnv->revokeInterface(
343 binaryUno_.get()->pExtEnv, j->second.object.get());
344 }
345 }
346 factory_->removeBridge(this);
347 for (Listeners::iterator i(ls.begin()); i != ls.end(); ++i) {
348 try {
349 (*i)->disposing(
350 css::lang::EventObject(
351 static_cast< cppu::OWeakObject * >(this)));
352 } catch (const css::uno::RuntimeException & e) {
353 SAL_WARN(
354 "binaryurp",
355 "caught runtime exception '" << e.Message << '\'');
356 }
266 } 357 }
267 tp = threadPool_;
268 std::swap(reader_, r);
269 std::swap(writer_, w);
270 ls.swap(listeners_);
271 terminated_ = true;
272 } 358 }
273 try { 359 if (final) {
274 connection_->close(); 360 uno_threadpool_destroy(tp);
275 } catch (const css::io::IOException & e) { 361 }
276 SAL_INFO("binaryurp", "caught IO exception '" << e.Message << '\'');
277 }
278 assert(w.is());
279 w->stop();
280 joinThread(r.get());
281 joinThread(w.get());
282 assert(tp != 0);
283 uno_threadpool_dispose(tp);
284 Stubs s;
285 { 362 {
286 osl::MutexGuard g(mutex_); 363 osl::MutexGuard g(mutex_);
287 s.swap(stubs_); 364 if (final) {
288 } 365 threadPool_ = 0;
289 for (Stubs::iterator i(s.begin()); i != s.end(); ++i) {
290 for (Stub::iterator j(i->second.begin()); j != i->second.end(); ++j) {
291 SAL_INFO(
292 "binaryurp",
293 "stub '" << i->first << "', '" << toString(j->first)
294 << "' still mapped at Bridge::terminate");
295 binaryUno_.get()->pExtEnv->revokeInterface(
296 binaryUno_.get()->pExtEnv, j->second.object.get());
297 } 366 }
298 } 367 }
299 factory_->removeBridge(this); 368 terminated_.set();
300 for (Listeners::iterator i(ls.begin()); i != ls.end(); ++i) {
301 try {
302 (*i)->disposing(
303 css::lang::EventObject(
304 static_cast< cppu::OWeakObject * >(this)));
305 } catch (const css::uno::RuntimeException & e) {
306 SAL_WARN(
307 "binaryurp", "caught runtime exception '" << e.Message << '\'');
308 }
309 }
310 uno_threadpool_destroy(tp);
311} 369}
312 370
313css::uno::Reference< css::connection::XConnection > Bridge::getConnection() 371css::uno::Reference< css::connection::XConnection > Bridge::getConnection()
@@ -339,19 +397,14 @@ BinaryAny Bridge::mapCppToBinaryAny(css::uno::Any const & cppAny) {
339 397
340uno_ThreadPool Bridge::getThreadPool() { 398uno_ThreadPool Bridge::getThreadPool() {
341 osl::MutexGuard g(mutex_); 399 osl::MutexGuard g(mutex_);
400 checkDisposed();
342 assert(threadPool_ != 0); 401 assert(threadPool_ != 0);
343 return threadPool_; 402 return threadPool_;
344} 403}
345 404
346rtl::Reference< Writer > Bridge::getWriter() { 405rtl::Reference< Writer > Bridge::getWriter() {
347 osl::MutexGuard g(mutex_); 406 osl::MutexGuard g(mutex_);
348 if (terminated_) { 407 checkDisposed();
349 throw css::lang::DisposedException(
350 rtl::OUString(
351 RTL_CONSTASCII_USTRINGPARAM(
352 "Binary URP bridge already disposed")),
353 static_cast< cppu::OWeakObject * >(this));
354 }
355 assert(writer_.is()); 408 assert(writer_.is());
356 return writer_; 409 return writer_;
357} 410}
@@ -822,9 +875,15 @@ bool Bridge::isCurrentContextMode() {
822} 875}
823 876
824Bridge::~Bridge() { 877Bridge::~Bridge() {
825 if (getThreadPool() != 0) { 878#if OSL_DEBUG_LEVEL > 0
826 terminate(); 879 {
880 osl::MutexGuard g(mutex_);
881 SAL_WARN_IF(
882 state_ == STATE_STARTED || state_ == STATE_TERMINATED, "binaryurp",
883 "undisposed bridge, potential deadlock ahead");
827 } 884 }
885#endif
886 dispose();
828} 887}
829 888
830css::uno::Reference< css::uno::XInterface > Bridge::getInstance( 889css::uno::Reference< css::uno::XInterface > Bridge::getInstance(
@@ -885,7 +944,11 @@ rtl::OUString Bridge::getDescription() throw (css::uno::RuntimeException) {
885} 944}
886 945
887void Bridge::dispose() throw (css::uno::RuntimeException) { 946void Bridge::dispose() throw (css::uno::RuntimeException) {
888 terminate(); 947 // For terminate(true) not to deadlock, an external protocol must ensure
948 // that dispose is not called from a thread pool worker thread (that dispose
949 // is never called from the reader or writer thread is already ensured
950 // internally):
951 terminate(true);
889 // OOo expects dispose to not return while there are still remote calls in 952 // OOo expects dispose to not return while there are still remote calls in
890 // progress; an external protocol must ensure that dispose is not called 953 // progress; an external protocol must ensure that dispose is not called
891 // from within an incoming or outgoing remote call, as passive_.wait() would 954 // from within an incoming or outgoing remote call, as passive_.wait() would
@@ -900,7 +963,8 @@ void Bridge::addEventListener(
900 assert(xListener.is()); 963 assert(xListener.is());
901 { 964 {
902 osl::MutexGuard g(mutex_); 965 osl::MutexGuard g(mutex_);
903 if (!terminated_) { 966 assert(state_ != STATE_INITIAL);
967 if (state_ == STATE_STARTED) {
904 listeners_.push_back(xListener); 968 listeners_.push_back(xListener);
905 return; 969 return;
906 } 970 }
@@ -995,7 +1059,18 @@ void Bridge::terminateWhenUnused(bool unused) {
995 // That the current thread considers the bridge unused implies that it 1059 // That the current thread considers the bridge unused implies that it
996 // is not within an incoming or outgoing remote call (so calling 1060 // is not within an incoming or outgoing remote call (so calling
997 // terminate cannot lead to deadlock): 1061 // terminate cannot lead to deadlock):
998 terminate(); 1062 terminate(false);
1063 }
1064}
1065
1066void Bridge::checkDisposed() {
1067 assert(state_ != STATE_INITIAL);
1068 if (state_ != STATE_STARTED) {
1069 throw css::lang::DisposedException(
1070 rtl::OUString(
1071 RTL_CONSTASCII_USTRINGPARAM(
1072 "Binary URP bridge already disposed")),
1073 static_cast< cppu::OWeakObject * >(this));
999 } 1074 }
1000} 1075}
1001 1076
diff --git a/binaryurp/source/bridge.hxx b/binaryurp/source/bridge.hxx
index 8d667897d253..3ffbfbaeb43b 100644
--- a/binaryurp/source/bridge.hxx
+++ b/binaryurp/source/bridge.hxx
@@ -93,8 +93,11 @@ public:
93 void start(); 93 void start();
94 94
95 // Internally waits for all incoming and outgoing remote calls to terminate, 95 // Internally waits for all incoming and outgoing remote calls to terminate,
96 // so must not be called from within such a call: 96 // so must not be called from within such a call; when final is true, also
97 void terminate(); 97 // joins all remaining threads (reader, writer, and worker threads from the
98 // thread pool), so must not be called with final set to true from such a
99 // thread:
100 void terminate(bool final);
98 101
99 com::sun::star::uno::Reference< com::sun::star::connection::XConnection > 102 com::sun::star::uno::Reference< com::sun::star::connection::XConnection >
100 getConnection() const; 103 getConnection() const;
@@ -228,6 +231,9 @@ private:
228 231
229 void terminateWhenUnused(bool unused); 232 void terminateWhenUnused(bool unused);
230 233
234 // Must only be called with mutex_ locked:
235 void checkDisposed();
236
231 typedef 237 typedef
232 std::list< 238 std::list<
233 com::sun::star::uno::Reference< 239 com::sun::star::uno::Reference<
@@ -240,6 +246,8 @@ private:
240 246
241 typedef std::map< rtl::OUString, Stub > Stubs; 247 typedef std::map< rtl::OUString, Stub > Stubs;
242 248
249 enum State { STATE_INITIAL, STATE_STARTED, STATE_TERMINATED, STATE_FINAL };
250
243 enum Mode { 251 enum Mode {
244 MODE_REQUESTED, MODE_REPLY_MINUS1, MODE_REPLY_0, MODE_REPLY_1, 252 MODE_REQUESTED, MODE_REPLY_MINUS1, MODE_REPLY_0, MODE_REPLY_1,
245 MODE_WAIT, MODE_NORMAL, MODE_NORMAL_WAIT }; 253 MODE_WAIT, MODE_NORMAL, MODE_NORMAL_WAIT };
@@ -259,8 +267,15 @@ private:
259 com::sun::star::uno::TypeDescription protPropRequest_; 267 com::sun::star::uno::TypeDescription protPropRequest_;
260 com::sun::star::uno::TypeDescription protPropCommit_; 268 com::sun::star::uno::TypeDescription protPropCommit_;
261 OutgoingRequests outgoingRequests_; 269 OutgoingRequests outgoingRequests_;
270 osl::Condition passive_;
271 // to guarantee that passive_ is eventually set (to avoid deadlock, see
272 // dispose), activeCalls_ only counts those calls for which it can be
273 // guaranteed that incrementActiveCalls is indeed followed by
274 // decrementActiveCalls, without an intervening exception
275 osl::Condition terminated_;
262 276
263 osl::Mutex mutex_; 277 osl::Mutex mutex_;
278 State state_;
264 Listeners listeners_; 279 Listeners listeners_;
265 uno_ThreadPool threadPool_; 280 uno_ThreadPool threadPool_;
266 rtl::Reference< Writer > writer_; 281 rtl::Reference< Writer > writer_;
@@ -271,12 +286,6 @@ private:
271 std::size_t calls_; 286 std::size_t calls_;
272 bool normalCall_; 287 bool normalCall_;
273 std::size_t activeCalls_; 288 std::size_t activeCalls_;
274 osl::Condition passive_;
275 // to guarantee that passive_ is eventually set (to avoid deadlock, see
276 // dispose), activeCalls_ only counts those calls for which it can be
277 // guaranteed that incrementActiveCalls is indeed followed by
278 // decrementActiveCalls, without an intervening exception
279 bool terminated_;
280 289
281 // Only accessed from reader_ thread: 290 // Only accessed from reader_ thread:
282 Mode mode_; 291 Mode mode_;
diff --git a/binaryurp/source/bridgefactory.cxx b/binaryurp/source/bridgefactory.cxx
index 5c9105585916..8f4caa64e17d 100644
--- a/binaryurp/source/bridgefactory.cxx
+++ b/binaryurp/source/bridgefactory.cxx
@@ -212,7 +212,7 @@ static cppu::ImplementationEntry const services[] = {
212 { &binaryurp::BridgeFactory::static_create, 212 { &binaryurp::BridgeFactory::static_create,
213 &binaryurp::BridgeFactory::static_getImplementationName, 213 &binaryurp::BridgeFactory::static_getImplementationName,
214 &binaryurp::BridgeFactory::static_getSupportedServiceNames, 214 &binaryurp::BridgeFactory::static_getSupportedServiceNames,
215 &cppu::createSingleComponentFactory, 0, 0 }, 215 &cppu::createOneInstanceComponentFactory, 0, 0 },
216 { 0, 0, 0, 0, 0, 0 } 216 { 0, 0, 0, 0, 0, 0 }
217}; 217};
218 218
diff --git a/binaryurp/source/incomingrequest.cxx b/binaryurp/source/incomingrequest.cxx
index 431c88505ad1..83b0030623e7 100644
--- a/binaryurp/source/incomingrequest.cxx
+++ b/binaryurp/source/incomingrequest.cxx
@@ -123,7 +123,7 @@ void IncomingRequest::execute() const {
123 } catch (const std::exception & e) { 123 } catch (const std::exception & e) {
124 OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what()); 124 OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what());
125 } 125 }
126 bridge_->terminate(); 126 bridge_->terminate(false);
127 } else { 127 } else {
128 if (isExc) { 128 if (isExc) {
129 OSL_TRACE(OSL_LOG_PREFIX "oneway method raised exception"); 129 OSL_TRACE(OSL_LOG_PREFIX "oneway method raised exception");
diff --git a/binaryurp/source/reader.cxx b/binaryurp/source/reader.cxx
index c052fadc5629..47bdfbf065fc 100644
--- a/binaryurp/source/reader.cxx
+++ b/binaryurp/source/reader.cxx
@@ -150,7 +150,7 @@ void Reader::run() {
150 } catch (const std::exception & e) { 150 } catch (const std::exception & e) {
151 SAL_WARN("binaryurp", "caught C++ exception '" << e.what() << '\''); 151 SAL_WARN("binaryurp", "caught C++ exception '" << e.what() << '\'');
152 } 152 }
153 bridge_->terminate(); 153 bridge_->terminate(false);
154} 154}
155 155
156void Reader::onTerminated() { 156void Reader::onTerminated() {
diff --git a/binaryurp/source/writer.cxx b/binaryurp/source/writer.cxx
index 5aca57b108eb..69825e13bc37 100644
--- a/binaryurp/source/writer.cxx
+++ b/binaryurp/source/writer.cxx
@@ -196,7 +196,7 @@ void Writer::run() {
196 } catch (const std::exception & e) { 196 } catch (const std::exception & e) {
197 OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what()); 197 OSL_TRACE(OSL_LOG_PREFIX "caught C++ exception '%s'", e.what());
198 } 198 }
199 bridge_->terminate(); 199 bridge_->terminate(false);
200} 200}
201 201
202void Writer::onTerminated() { 202void Writer::onTerminated() {
diff --git a/cppu/inc/uno/threadpool.h b/cppu/inc/uno/threadpool.h
index 0c647442e377..77bb712e53b4 100644
--- a/cppu/inc/uno/threadpool.h
+++ b/cppu/inc/uno/threadpool.h
@@ -166,10 +166,6 @@ uno_threadpool_putJob(
166 return immeadiatly with *ppJob == 0. 166 return immeadiatly with *ppJob == 0.
167 167
168 @param hPool The handle to be disposed. 168 @param hPool The handle to be disposed.
169 In case, hPool is 0, this function joins on all threads created
170 by the threadpool administration. This may e.g. used to ensure, that
171 no threads are inside the cppu library anymore, in case it needs to get
172 unloaded.
173 169
174 This function is called i.e. by a bridge, that is forced to dispose itself. 170 This function is called i.e. by a bridge, that is forced to dispose itself.
175 */ 171 */
@@ -180,6 +176,10 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C();
180/** Releases the previously with uno_threadpool_create() created handle. 176/** Releases the previously with uno_threadpool_create() created handle.
181 The handle thus becomes invalid. It is an error to use the handle after 177 The handle thus becomes invalid. It is an error to use the handle after
182 uno_threadpool_destroy(). 178 uno_threadpool_destroy().
179
180 A call to uno_threadpool_destroy can synchronously join on spawned worker
181 threads, so this function must never be called from such a worker thread.
182
183 @see uno_threadpool_create() 183 @see uno_threadpool_create()
184 */ 184 */
185void SAL_CALL 185void SAL_CALL
diff --git a/cppu/source/threadpool/thread.cxx b/cppu/source/threadpool/thread.cxx
index cc22a453c79d..12ea09a131ee 100644
--- a/cppu/source/threadpool/thread.cxx
+++ b/cppu/source/threadpool/thread.cxx
@@ -33,7 +33,6 @@
33#include <com/sun/star/lang/DisposedException.hpp> 33#include <com/sun/star/lang/DisposedException.hpp>
34#include <com/sun/star/uno/Reference.hxx> 34#include <com/sun/star/uno/Reference.hxx>
35#include <com/sun/star/uno/XInterface.hpp> 35#include <com/sun/star/uno/XInterface.hpp>
36#include <rtl/instance.hxx>
37#include <rtl/ustring.h> 36#include <rtl/ustring.h>
38#include <rtl/ustring.hxx> 37#include <rtl/ustring.hxx>
39 38
@@ -48,17 +47,6 @@ namespace css = com::sun::star;
48} 47}
49 48
50using namespace osl; 49using namespace osl;
51extern "C" {
52
53void SAL_CALL cppu_requestThreadWorker( void *pVoid )
54{
55 ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid;
56
57 pThread->run();
58 pThread->onTerminated();
59}
60
61}
62 50
63namespace cppu_threadpool { 51namespace cppu_threadpool {
64 52
@@ -75,7 +63,7 @@ namespace cppu_threadpool {
75#endif 63#endif
76 } 64 }
77 65
78 void ThreadAdmin::add( ORequestThread *p ) 66 void ThreadAdmin::add( rtl::Reference< ORequestThread > const & p )
79 { 67 {
80 MutexGuard aGuard( m_mutex ); 68 MutexGuard aGuard( m_mutex );
81 if( m_disposed ) 69 if( m_disposed )
@@ -90,12 +78,19 @@ namespace cppu_threadpool {
90 m_lst.push_back( p ); 78 m_lst.push_back( p );
91 } 79 }
92 80
93 void ThreadAdmin::remove( ORequestThread * p ) 81 void ThreadAdmin::remove_locked( rtl::Reference< ORequestThread > const & p )
82 {
83 ::std::list< rtl::Reference< ORequestThread > >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
84 if( ii != m_lst.end() )
85 {
86 m_lst.erase( ii );
87 }
88 }
89
90 void ThreadAdmin::remove( rtl::Reference< ORequestThread > const & p )
94 { 91 {
95 MutexGuard aGuard( m_mutex ); 92 MutexGuard aGuard( m_mutex );
96 ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); 93 remove_locked( p );
97 OSL_ASSERT( ii != m_lst.end() );
98 m_lst.erase( ii );
99 } 94 }
100 95
101 void ThreadAdmin::join() 96 void ThreadAdmin::join()
@@ -104,62 +99,34 @@ namespace cppu_threadpool {
104 MutexGuard aGuard( m_mutex ); 99 MutexGuard aGuard( m_mutex );
105 m_disposed = true; 100 m_disposed = true;
106 } 101 }
107 ORequestThread *pCurrent; 102 for (;;)
108 do
109 { 103 {
110 pCurrent = 0; 104 rtl::Reference< ORequestThread > pCurrent;
111 { 105 {
112 MutexGuard aGuard( m_mutex ); 106 MutexGuard aGuard( m_mutex );
113 if( ! m_lst.empty() ) 107 if( m_lst.empty() )
114 { 108 {
115 pCurrent = m_lst.front(); 109 break;
116 pCurrent->setDeleteSelf( sal_False );
117 } 110 }
111 pCurrent = m_lst.front();
112 m_lst.pop_front();
118 } 113 }
119 if ( pCurrent ) 114 pCurrent->join();
120 {
121 pCurrent->join();
122 delete pCurrent;
123 }
124 } while( pCurrent );
125 }
126
127 struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin >
128 {
129 ThreadAdminHolder operator () () {
130 ThreadAdminHolder aRet(new ThreadAdmin());
131 return aRet;
132 } 115 }
133 };
134
135 ThreadAdminHolder& ThreadAdmin::getInstance()
136 {
137 return theThreadAdmin::get();
138 } 116 }
139 117
140// ---------------------------------------------------------------------------------- 118// ----------------------------------------------------------------------------------
141 ORequestThread::ORequestThread( JobQueue *pQueue, 119 ORequestThread::ORequestThread( ThreadPoolHolder const &aThreadPool,
120 JobQueue *pQueue,
142 const ByteSequence &aThreadId, 121 const ByteSequence &aThreadId,
143 sal_Bool bAsynchron ) 122 sal_Bool bAsynchron )
144 : m_thread( 0 ) 123 : m_aThreadPool( aThreadPool )
145 , m_aThreadAdmin( ThreadAdmin::getInstance() )
146 , m_pQueue( pQueue ) 124 , m_pQueue( pQueue )
147 , m_aThreadId( aThreadId ) 125 , m_aThreadId( aThreadId )
148 , m_bAsynchron( bAsynchron ) 126 , m_bAsynchron( bAsynchron )
149 , m_bDeleteSelf( sal_True ) 127 {}
150 {
151 m_aThreadAdmin->add( this );
152 }
153
154
155 ORequestThread::~ORequestThread()
156 {
157 if (m_thread != 0)
158 {
159 osl_destroyThread(m_thread);
160 }
161 }
162 128
129 ORequestThread::~ORequestThread() {}
163 130
164 void ORequestThread::setTask( JobQueue *pQueue, 131 void ORequestThread::setTask( JobQueue *pQueue,
165 const ByteSequence &aThreadId, 132 const ByteSequence &aThreadId,
@@ -170,74 +137,81 @@ namespace cppu_threadpool {
170 m_bAsynchron = bAsynchron; 137 m_bAsynchron = bAsynchron;
171 } 138 }
172 139
173 sal_Bool ORequestThread::create() 140 void ORequestThread::launch()
174 { 141 {
175 OSL_ASSERT(m_thread == 0); // only one running thread per instance 142 // Assumption is that osl::Thread::create returns normally with a true
176 143 // return value iff it causes osl::Thread::run to start executing:
177 m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this); 144 acquire();
178 if ( m_thread ) 145 ThreadAdmin & rThreadAdmin = m_aThreadPool->getThreadAdmin();
179 { 146 osl::ClearableMutexGuard g(rThreadAdmin.m_mutex);
180 osl_resumeThread( m_thread ); 147 rThreadAdmin.add( this );
148 try {
149 if (!create()) {
150 throw std::runtime_error("osl::Thread::create failed");
151 }
152 } catch (...) {
153 rThreadAdmin.remove_locked( this );
154 g.clear();
155 release();
156 throw;
181 } 157 }
182
183 return m_thread != 0;
184 }
185
186 void ORequestThread::join()
187 {
188 osl_joinWithThread( m_thread );
189 } 158 }
190 159
191 void ORequestThread::onTerminated() 160 void ORequestThread::onTerminated()
192 { 161 {
193 m_aThreadAdmin->remove( this ); 162 m_aThreadPool->getThreadAdmin().remove( this );
194 if( m_bDeleteSelf ) 163 release();
195 {
196 delete this;
197 }
198 } 164 }
199 165
200 void ORequestThread::run() 166 void ORequestThread::run()
201 { 167 {
202 ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance(); 168 try
203
204 while ( m_pQueue )
205 { 169 {
206 if( ! m_bAsynchron ) 170 while ( m_pQueue )
207 { 171 {
208 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) 172 if( ! m_bAsynchron )
209 { 173 {
210 OSL_ASSERT( false ); 174 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
175 {
176 OSL_ASSERT( false );
177 }
211 } 178 }
212 }
213 179
214 while( ! m_pQueue->isEmpty() ) 180 while( ! m_pQueue->isEmpty() )
215 {
216 // Note : Oneways should not get a disposable disposeid,
217 // It does not make sense to dispose a call in this state.
218 // That's way we put it an disposeid, that can't be used otherwise.
219 m_pQueue->enter(
220 sal::static_int_cast< sal_Int64 >(
221 reinterpret_cast< sal_IntPtr >(this)),
222 sal_True );
223
224 if( m_pQueue->isEmpty() )
225 { 181 {
226 theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); 182 // Note : Oneways should not get a disposable disposeid,
227 // Note : revokeQueue might have failed because m_pQueue.isEmpty() 183 // It does not make sense to dispose a call in this state.
228 // may be false (race). 184 // That's way we put it an disposeid, that can't be used otherwise.
185 m_pQueue->enter(
186 sal::static_int_cast< sal_Int64 >(
187 reinterpret_cast< sal_IntPtr >(this)),
188 sal_True );
189
190 if( m_pQueue->isEmpty() )
191 {
192 m_aThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
193 // Note : revokeQueue might have failed because m_pQueue.isEmpty()
194 // may be false (race).
195 }
229 } 196 }
230 }
231 197
232 delete m_pQueue; 198 delete m_pQueue;
233 m_pQueue = 0; 199 m_pQueue = 0;
234 200
235 if( ! m_bAsynchron ) 201 if( ! m_bAsynchron )
236 { 202 {
237 uno_releaseIdFromCurrentThread(); 203 uno_releaseIdFromCurrentThread();
238 } 204 }
239 205
240 theThreadPool->waitInPool( this ); 206 m_aThreadPool->waitInPool( this );
207 }
208 }
209 catch (...)
210 {
211 // Work around the problem that onTerminated is not called if run
212 // throws an exception:
213 onTerminated();
214 throw;
241 } 215 }
242 } 216 }
243} 217}
diff --git a/cppu/source/threadpool/thread.hxx b/cppu/source/threadpool/thread.hxx
index a3ea45aadaed..88f3d91f8722 100644
--- a/cppu/source/threadpool/thread.hxx
+++ b/cppu/source/threadpool/thread.hxx
@@ -28,63 +28,49 @@
28#ifndef _CPPU_THREADPOOL_THREAD_HXX 28#ifndef _CPPU_THREADPOOL_THREAD_HXX
29#define _CPPU_THREADPOOL_THREAD_HXX 29#define _CPPU_THREADPOOL_THREAD_HXX
30 30
31#include <list> 31#include <osl/thread.hxx>
32#include <sal/types.h> 32#include <sal/types.h>
33 33#include <salhelper/simplereferenceobject.hxx>
34#include <osl/thread.h>
35 34
36#include "jobqueue.hxx" 35#include "jobqueue.hxx"
36#include "threadpool.hxx"
37 37
38namespace cppu_threadpool { 38namespace cppu_threadpool {
39 39
40 class JobQueue; 40 class JobQueue;
41 class ThreadAdmin;
42 typedef boost::shared_ptr<ThreadAdmin> ThreadAdminHolder;
43 41
44 //----------------------------------------- 42 //-----------------------------------------
45 // private thread class for the threadpool 43 // private thread class for the threadpool
46 // independent from vos 44 // independent from vos
47 //----------------------------------------- 45 //-----------------------------------------
48 class ORequestThread 46 class ORequestThread:
47 public salhelper::SimpleReferenceObject, public osl::Thread
49 { 48 {
50 public: 49 public:
51 ORequestThread( JobQueue * , 50 ORequestThread( ThreadPoolHolder const &aThreadPool,
51 JobQueue * ,
52 const ::rtl::ByteSequence &aThreadId, 52 const ::rtl::ByteSequence &aThreadId,
53 sal_Bool bAsynchron ); 53 sal_Bool bAsynchron );
54 ~ORequestThread(); 54 virtual ~ORequestThread();
55 55
56 void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , sal_Bool bAsynchron ); 56 void setTask( JobQueue * , const ::rtl::ByteSequence & aThreadId , sal_Bool bAsynchron );
57 57
58 sal_Bool create(); 58 void launch();
59 void join(); 59
60 void onTerminated(); 60 static inline void * operator new(std::size_t size)
61 void run(); 61 { return SimpleReferenceObject::operator new(size); }
62 inline void setDeleteSelf( sal_Bool b ) 62
63 { m_bDeleteSelf = b; } 63 static inline void operator delete(void * pointer)
64 { SimpleReferenceObject::operator delete(pointer); }
64 65
65 private: 66 private:
66 oslThread m_thread; 67 virtual void SAL_CALL run();
67 ThreadAdminHolder m_aThreadAdmin; 68 virtual void SAL_CALL onTerminated();
69
70 ThreadPoolHolder m_aThreadPool;
68 JobQueue *m_pQueue; 71 JobQueue *m_pQueue;
69 ::rtl::ByteSequence m_aThreadId; 72 ::rtl::ByteSequence m_aThreadId;
70 sal_Bool m_bAsynchron; 73 sal_Bool m_bAsynchron;
71 sal_Bool m_bDeleteSelf;
72 };
73
74 class ThreadAdmin
75 {
76 public:
77 ThreadAdmin();
78 ~ThreadAdmin ();
79 static ThreadAdminHolder &getInstance();
80 void add( ORequestThread * );
81 void remove( ORequestThread * );
82 void join();
83
84 private:
85 ::osl::Mutex m_mutex;
86 ::std::list< ORequestThread * > m_lst;
87 bool m_disposed;
88 }; 74 };
89 75
90} // end cppu_threadpool 76} // end cppu_threadpool
diff --git a/cppu/source/threadpool/threadpool.cxx b/cppu/source/threadpool/threadpool.cxx
index f9f0be6c7d03..9099b7e1981e 100644
--- a/cppu/source/threadpool/threadpool.cxx
+++ b/cppu/source/threadpool/threadpool.cxx
@@ -109,15 +109,6 @@ namespace cppu_threadpool
109 109
110 //------------------------------------------------------------------------------- 110 //-------------------------------------------------------------------------------
111 111
112 struct theThreadPool :
113 public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool >
114 {
115 ThreadPoolHolder operator () () {
116 ThreadPoolHolder aRet(new ThreadPool());
117 return aRet;
118 }
119 };
120
121 ThreadPool::ThreadPool() 112 ThreadPool::ThreadPool()
122 { 113 {
123 m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance(); 114 m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
@@ -132,46 +123,24 @@ namespace cppu_threadpool
132 } 123 }
133#endif 124#endif
134 } 125 }
135 ThreadPoolHolder ThreadPool::getInstance()
136 {
137 return theThreadPool::get();
138 }
139
140 126
141 void ThreadPool::dispose( sal_Int64 nDisposeId ) 127 void ThreadPool::dispose( sal_Int64 nDisposeId )
142 { 128 {
143 if( nDisposeId ) 129 m_DisposedCallerAdmin->dispose( nDisposeId );
144 {
145 m_DisposedCallerAdmin->dispose( nDisposeId );
146 130
147 MutexGuard guard( m_mutex ); 131 MutexGuard guard( m_mutex );
148 for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; 132 for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
149 ii != m_mapQueue.end(); 133 ii != m_mapQueue.end();
150 ++ii) 134 ++ii)
135 {
136 if( (*ii).second.first )
151 { 137 {
152 if( (*ii).second.first ) 138 (*ii).second.first->dispose( nDisposeId );
153 {
154 (*ii).second.first->dispose( nDisposeId );
155 }
156 if( (*ii).second.second )
157 {
158 (*ii).second.second->dispose( nDisposeId );
159 }
160 } 139 }
161 } 140 if( (*ii).second.second )
162 else
163 {
164 { 141 {
165 MutexGuard guard( m_mutexWaitingThreadList ); 142 (*ii).second.second->dispose( nDisposeId );
166 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
167 ii != m_lstThreads.end() ;
168 ++ ii )
169 {
170 // wake the threads up
171 osl_setCondition( (*ii)->condition );
172 }
173 } 143 }
174 ThreadAdmin::getInstance()->join();
175 } 144 }
176 } 145 }
177 146
@@ -185,7 +154,7 @@ namespace cppu_threadpool
185 * a new request comes in, this thread is reused. This is done only to improve performance, 154 * a new request comes in, this thread is reused. This is done only to improve performance,
186 * it is not required for threadpool functionality. 155 * it is not required for threadpool functionality.
187 ******************/ 156 ******************/
188 void ThreadPool::waitInPool( ORequestThread * pThread ) 157 void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
189 { 158 {
190 struct WaitingThread waitingThread; 159 struct WaitingThread waitingThread;
191 waitingThread.condition = osl_createCondition(); 160 waitingThread.condition = osl_createCondition();
@@ -201,7 +170,7 @@ namespace cppu_threadpool
201 170
202 { 171 {
203 MutexGuard guard ( m_mutexWaitingThreadList ); 172 MutexGuard guard ( m_mutexWaitingThreadList );
204 if( waitingThread.thread ) 173 if( waitingThread.thread.is() )
205 { 174 {
206 // thread wasn't reused, remove it from the list 175 // thread wasn't reused, remove it from the list
207 WaitingThreadList::iterator ii = find( 176 WaitingThreadList::iterator ii = find(
@@ -214,6 +183,21 @@ namespace cppu_threadpool
214 osl_destroyCondition( waitingThread.condition ); 183 osl_destroyCondition( waitingThread.condition );
215 } 184 }
216 185
186 void ThreadPool::joinWorkers()
187 {
188 {
189 MutexGuard guard( m_mutexWaitingThreadList );
190 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
191 ii != m_lstThreads.end() ;
192 ++ ii )
193 {
194 // wake the threads up
195 osl_setCondition( (*ii)->condition );
196 }
197 }
198 m_aThreadAdmin.join();
199 }
200
217 void ThreadPool::createThread( JobQueue *pQueue , 201 void ThreadPool::createThread( JobQueue *pQueue ,
218 const ByteSequence &aThreadId, 202 const ByteSequence &aThreadId,
219 sal_Bool bAsynchron ) 203 sal_Bool bAsynchron )
@@ -240,10 +224,9 @@ namespace cppu_threadpool
240 224
241 if( bCreate ) 225 if( bCreate )
242 { 226 {
243 ORequestThread *pThread = 227 rtl::Reference< ORequestThread > pThread(
244 new ORequestThread( pQueue , aThreadId, bAsynchron); 228 new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
245 // deletes itself ! 229 pThread->launch();
246 pThread->create();
247 } 230 }
248 } 231 }
249 232
@@ -385,6 +368,12 @@ namespace cppu_threadpool
385 } 368 }
386} 369}
387 370
371// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
372// spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
373// (within the last uno_threadpool_destroy) all worker threads spawned by that
374// ThreadPool instance are joined (which implies that uno_threadpool_destroy
375// must never be called from a worker thread); afterwards, the next call to
376// uno_threadpool_create (if any) will lead to a new ThreadPool instance.
388 377
389using namespace cppu_threadpool; 378using namespace cppu_threadpool;
390 379
@@ -415,27 +404,47 @@ struct _uno_ThreadPool
415 sal_Int32 dummy; 404 sal_Int32 dummy;
416}; 405};
417 406
407namespace {
408
409ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
410{
411 MutexGuard guard( Mutex::getGlobalMutex() );
412 assert( g_pThreadpoolHashSet != 0 );
413 ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
414 assert( i != g_pThreadpoolHashSet->end() );
415 return i->second;
416}
417
418}
419
418extern "C" uno_ThreadPool SAL_CALL 420extern "C" uno_ThreadPool SAL_CALL
419uno_threadpool_create() SAL_THROW_EXTERN_C() 421uno_threadpool_create() SAL_THROW_EXTERN_C()
420{ 422{
421 MutexGuard guard( Mutex::getGlobalMutex() ); 423 MutexGuard guard( Mutex::getGlobalMutex() );
424 ThreadPoolHolder p;
422 if( ! g_pThreadpoolHashSet ) 425 if( ! g_pThreadpoolHashSet )
423 { 426 {
424 g_pThreadpoolHashSet = new ThreadpoolHashSet(); 427 g_pThreadpoolHashSet = new ThreadpoolHashSet();
428 p = new ThreadPool;
429 }
430 else
431 {
432 assert( !g_pThreadpoolHashSet->empty() );
433 p = g_pThreadpoolHashSet->begin()->second;
425 } 434 }
426 435
427 // Just ensure that the handle is unique in the process (via heap) 436 // Just ensure that the handle is unique in the process (via heap)
428 uno_ThreadPool h = new struct _uno_ThreadPool; 437 uno_ThreadPool h = new struct _uno_ThreadPool;
429 g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) ); 438 g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
430 return h; 439 return h;
431} 440}
432 441
433extern "C" void SAL_CALL 442extern "C" void SAL_CALL
434uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C() 443uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
435{ 444{
436 sal_Sequence *pThreadId = 0; 445 sal_Sequence *pThreadId = 0;
437 uno_getIdOfCurrentThread( &pThreadId ); 446 uno_getIdOfCurrentThread( &pThreadId );
438 ThreadPool::getInstance()->prepare( pThreadId ); 447 getThreadPool( hPool )->prepare( pThreadId );
439 rtl_byte_sequence_release( pThreadId ); 448 rtl_byte_sequence_release( pThreadId );
440 uno_releaseIdFromCurrentThread(); 449 uno_releaseIdFromCurrentThread();
441} 450}
@@ -447,7 +456,7 @@ uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
447 sal_Sequence *pThreadId = 0; 456 sal_Sequence *pThreadId = 0;
448 uno_getIdOfCurrentThread( &pThreadId ); 457 uno_getIdOfCurrentThread( &pThreadId );
449 *ppJob = 458 *ppJob =
450 ThreadPool::getInstance()->enter( 459 getThreadPool( hPool )->enter(
451 pThreadId, 460 pThreadId,
452 sal::static_int_cast< sal_Int64 >( 461 sal::static_int_cast< sal_Int64 >(
453 reinterpret_cast< sal_IntPtr >(hPool)) ); 462 reinterpret_cast< sal_IntPtr >(hPool)) );
@@ -463,19 +472,19 @@ uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
463 472
464extern "C" void SAL_CALL 473extern "C" void SAL_CALL
465uno_threadpool_putJob( 474uno_threadpool_putJob(
466 uno_ThreadPool, 475 uno_ThreadPool hPool,
467 sal_Sequence *pThreadId, 476 sal_Sequence *pThreadId,
468 void *pJob, 477 void *pJob,
469 void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), 478 void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
470 sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() 479 sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
471{ 480{
472 ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest ); 481 getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest );
473} 482}
474 483
475extern "C" void SAL_CALL 484extern "C" void SAL_CALL
476uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() 485uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
477{ 486{
478 ThreadPool::getInstance()->dispose( 487 getThreadPool(hPool)->dispose(
479 sal::static_int_cast< sal_Int64 >( 488 sal::static_int_cast< sal_Int64 >(
480 reinterpret_cast< sal_IntPtr >(hPool)) ); 489 reinterpret_cast< sal_IntPtr >(hPool)) );
481} 490}
@@ -483,9 +492,8 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
483extern "C" void SAL_CALL 492extern "C" void SAL_CALL
484uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() 493uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
485{ 494{
486 assert(hPool != 0); 495 ThreadPoolHolder p( getThreadPool(hPool) );
487 496 p->destroy(
488 ThreadPool::getInstance()->destroy(
489 sal::static_int_cast< sal_Int64 >( 497 sal::static_int_cast< sal_Int64 >(
490 reinterpret_cast< sal_IntPtr >(hPool)) ); 498 reinterpret_cast< sal_IntPtr >(hPool)) );
491 499
@@ -510,7 +518,7 @@ uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
510 518
511 if( empty ) 519 if( empty )
512 { 520 {
513 uno_threadpool_dispose( 0 ); 521 p->joinWorkers();
514 } 522 }
515} 523}
516 524
diff --git a/cppu/source/threadpool/threadpool.hxx b/cppu/source/threadpool/threadpool.hxx
index 18bb47a1ff20..3ff52b362af1 100644
--- a/cppu/source/threadpool/threadpool.hxx
+++ b/cppu/source/threadpool/threadpool.hxx
@@ -25,11 +25,19 @@
25 * for a copy of the LGPLv3 License. 25 * for a copy of the LGPLv3 License.
26 * 26 *
27 ************************************************************************/ 27 ************************************************************************/
28
29#ifndef INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX
30#define INCLUDED_CPPU_SOURCE_THREADPOOL_THREADPOOL_HXX
31
32#include <list>
33
28#include <boost/unordered_map.hpp> 34#include <boost/unordered_map.hpp>
29 35
30#include <osl/conditn.h> 36#include <osl/conditn.h>
31 37
32#include <rtl/byteseq.hxx> 38#include <rtl/byteseq.hxx>
39#include <rtl/ref.hxx>
40#include <salhelper/simplereferenceobject.hxx>
33 41
34#include <boost/shared_ptr.hpp> 42#include <boost/shared_ptr.hpp>
35 43
@@ -74,7 +82,7 @@ namespace cppu_threadpool {
74 struct WaitingThread 82 struct WaitingThread
75 { 83 {
76 oslCondition condition; 84 oslCondition condition;
77 ORequestThread *thread; 85 rtl::Reference< ORequestThread > thread;
78 }; 86 };
79 87
80 typedef ::std::list < struct ::cppu_threadpool::WaitingThread * > WaitingThreadList; 88 typedef ::std::list < struct ::cppu_threadpool::WaitingThread * > WaitingThreadList;
@@ -98,15 +106,32 @@ namespace cppu_threadpool {
98 DisposedCallerList m_lst; 106 DisposedCallerList m_lst;
99 }; 107 };
100 108
109 class ThreadAdmin
110 {
111 public:
112 ThreadAdmin();
113 ~ThreadAdmin ();
114
115 void add( rtl::Reference< ORequestThread > const & );
116 void remove( rtl::Reference< ORequestThread > const & );
117 void join();
118
119 void remove_locked( rtl::Reference< ORequestThread > const & );
120 ::osl::Mutex m_mutex;
121
122 private:
123 ::std::list< rtl::Reference< ORequestThread > > m_lst;
124 bool m_disposed;
125 };
126
101 class ThreadPool; 127 class ThreadPool;
102 typedef boost::shared_ptr<ThreadPool> ThreadPoolHolder; 128 typedef rtl::Reference<ThreadPool> ThreadPoolHolder;
103 129
104 class ThreadPool 130 class ThreadPool: public salhelper::SimpleReferenceObject
105 { 131 {
106 public: 132 public:
107 ThreadPool(); 133 ThreadPool();
108 ~ThreadPool(); 134 ~ThreadPool();
109 static ThreadPoolHolder getInstance();
110 135
111 void dispose( sal_Int64 nDisposeId ); 136 void dispose( sal_Int64 nDisposeId );
112 void destroy( sal_Int64 nDisposeId ); 137 void destroy( sal_Int64 nDisposeId );
@@ -124,7 +149,12 @@ namespace cppu_threadpool {
124 ********/ 149 ********/
125 sal_Bool revokeQueue( const ByteSequence & aThreadId , sal_Bool bAsynchron ); 150 sal_Bool revokeQueue( const ByteSequence & aThreadId , sal_Bool bAsynchron );
126 151
127 void waitInPool( ORequestThread *pThread ); 152 void waitInPool( rtl::Reference< ORequestThread > const & pThread );
153
154 void joinWorkers();
155
156 ThreadAdmin & getThreadAdmin() { return m_aThreadAdmin; }
157
128 private: 158 private:
129 void createThread( JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron); 159 void createThread( JobQueue *pQueue, const ByteSequence &aThreadId, sal_Bool bAsynchron);
130 160
@@ -136,8 +166,11 @@ namespace cppu_threadpool {
136 WaitingThreadList m_lstThreads; 166 WaitingThreadList m_lstThreads;
137 167
138 DisposedCallerAdminHolder m_DisposedCallerAdmin; 168 DisposedCallerAdminHolder m_DisposedCallerAdmin;
169 ThreadAdmin m_aThreadAdmin;
139 }; 170 };
140 171
141} // end namespace cppu_threadpool 172} // end namespace cppu_threadpool
142 173
174#endif
175
143/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ 176/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/cppu/util/makefile.mk b/cppu/util/makefile.mk
index ea0c09488c85..cf8e1bd5133a 100644
--- a/cppu/util/makefile.mk
+++ b/cppu/util/makefile.mk
@@ -56,7 +56,7 @@ SHL1TARGET=$(TARGET)$(UDK_MAJOR)
56SHL1TARGET= uno_$(TARGET) 56SHL1TARGET= uno_$(TARGET)
57.ENDIF 57.ENDIF
58 58
59SHL1STDLIBS = $(SALLIB) 59SHL1STDLIBS = $(SALLIB) $(SALHELPERLIB)
60 60
61SHL1DEPN= 61SHL1DEPN=
62.IF "$(COM)" == "MSC" 62.IF "$(COM)" == "MSC"
diff --git a/cpputools/source/unoexe/unoexe.cxx b/cpputools/source/unoexe/unoexe.cxx
index 684bc39934e1..46fcbf8b310a 100644
--- a/cpputools/source/unoexe/unoexe.cxx
+++ b/cpputools/source/unoexe/unoexe.cxx
@@ -847,6 +847,10 @@ SAL_IMPLEMENT_MAIN_WITH_ARGS(argc,)
847 if (! xComp.is()) 847 if (! xComp.is())
848 throw RuntimeException( OUString( RTL_CONSTASCII_USTRINGPARAM("bridge factory does not export interface \"com.sun.star.lang.XComponent\"!" ) ), Reference< XInterface >() ); 848 throw RuntimeException( OUString( RTL_CONSTASCII_USTRINGPARAM("bridge factory does not export interface \"com.sun.star.lang.XComponent\"!" ) ), Reference< XInterface >() );
849 ODisposingListener::waitFor( xComp ); 849 ODisposingListener::waitFor( xComp );
850 xComp->dispose();
851 // explicitly dispose the remote bridge so that it joins
852 // on all spawned threads before process exit (see
853 // binaryurp/source/bridge.cxx for details)
850 break; 854 break;
851 } 855 }
852 } 856 }