summaryrefslogtreecommitdiff
path: root/comphelper/source
diff options
context:
space:
mode:
authorMichael Meeks <michael.meeks@collabora.com>2014-10-30 18:37:42 +0000
committerMichael Meeks <michael.meeks@collabora.com>2014-10-30 22:12:27 +0000
commit62090f65b804a08a66ca26675ae610ed07c7c341 (patch)
tree8b227d43555e4c10d5b410ffeb6a9b9b0d9e4220 /comphelper/source
parent2f55701c550950ab4530df3c9ca305a819e3cabb (diff)
Move thread-pool down into comphelper for re-use elsewhere.
Change-Id: Ib27b8b1ccc07ff194035d6c2ef3d45c429e3cea1
Diffstat (limited to 'comphelper/source')
-rw-r--r--comphelper/source/misc/threadpool.cxx185
1 files changed, 185 insertions, 0 deletions
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
new file mode 100644
index 000000000000..d2101ada54d2
--- /dev/null
+++ b/comphelper/source/misc/threadpool.cxx
@@ -0,0 +1,185 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include <comphelper/threadpool.hxx>
+
+#include <rtl/instance.hxx>
+#include <boost/shared_ptr.hpp>
+#include <thread>
+#include <algorithm>
+
+namespace comphelper {
+
+class ThreadPool::ThreadWorker : public salhelper::Thread
+{
+ ThreadPool *mpPool;
+ osl::Condition maNewWork;
+public:
+ ThreadWorker( ThreadPool *pPool ) :
+ salhelper::Thread("thread-pool"),
+ mpPool( pPool ) {}
+
+ virtual void execute() SAL_OVERRIDE
+ {
+ ThreadTask *pTask;
+ while ( ( pTask = waitForWork() ) )
+ {
+ pTask->doWork();
+ delete pTask;
+ }
+ }
+
+ ThreadTask *waitForWork()
+ {
+ ThreadTask *pRet = NULL;
+
+ osl::ResettableMutexGuard aGuard( mpPool->maGuard );
+
+ pRet = mpPool->popWork();
+
+ while( !pRet )
+ {
+ maNewWork.reset();
+
+ if( mpPool->mbTerminate )
+ break;
+
+ aGuard.clear(); // unlock
+
+ maNewWork.wait();
+
+ aGuard.reset(); // lock
+
+ pRet = mpPool->popWork();
+ }
+
+ return pRet;
+ }
+
+ // Why a condition per worker thread - you may ask.
+ //
+ // Unfortunately the Windows synchronisation API that we wrap
+ // is horribly inadequate cf.
+ // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
+ // The existing osl::Condition API should only ever be used
+ // between one producer and one consumer thread to avoid the
+ // lost wakeup problem.
+
+ void signalNewWork()
+ {
+ maNewWork.set();
+ }
+};
+
+ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
+ mbTerminate( false )
+{
+ for( sal_Int32 i = 0; i < nWorkers; i++ )
+ maWorkers.push_back( new ThreadWorker( this ) );
+
+ maTasksEmpty.reset();
+
+ osl::MutexGuard aGuard( maGuard );
+ for( size_t i = 0; i < maWorkers.size(); i++ )
+ maWorkers[ i ]->launch();
+}
+
+ThreadPool::~ThreadPool()
+{
+ waitUntilWorkersDone();
+}
+
+struct ThreadPoolStatic : public rtl::StaticWithInit< boost::shared_ptr< ThreadPool >,
+ ThreadPoolStatic >
+{
+ boost::shared_ptr< ThreadPool > operator () () {
+ sal_Int32 nThreads = std::max( std::thread::hardware_concurrency(), 1U );
+ return boost::shared_ptr< ThreadPool >( new ThreadPool( nThreads ) );
+ };
+};
+
+ThreadPool& ThreadPool::getSharedOptimalPool()
+{
+ return *ThreadPoolStatic::get().get();
+}
+
+/// wait until all the workers have completed and
+/// terminate all threads
+void ThreadPool::waitUntilWorkersDone()
+{
+ waitUntilEmpty();
+
+ osl::ResettableMutexGuard aGuard( maGuard );
+ mbTerminate = true;
+
+ while( !maWorkers.empty() )
+ {
+ rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
+ maWorkers.pop_back();
+ assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
+ == maWorkers.end());
+ xWorker->signalNewWork();
+ aGuard.clear();
+ { // unlocked
+ xWorker->join();
+ xWorker.clear();
+ }
+ aGuard.reset();
+ }
+}
+
+void ThreadPool::pushTask( ThreadTask *pTask )
+{
+ osl::MutexGuard aGuard( maGuard );
+ maTasks.insert( maTasks.begin(), pTask );
+ // horrible beyond belief:
+ for( size_t i = 0; i < maWorkers.size(); i++ )
+ maWorkers[ i ]->signalNewWork();
+ maTasksEmpty.reset();
+}
+
+ThreadTask *ThreadPool::popWork()
+{
+ if( !maTasks.empty() )
+ {
+ ThreadTask *pTask = maTasks.back();
+ maTasks.pop_back();
+ return pTask;
+ }
+ else
+ maTasksEmpty.set();
+ return NULL;
+}
+
+void ThreadPool::waitUntilEmpty()
+{
+ osl::ResettableMutexGuard aGuard( maGuard );
+
+ if( maWorkers.empty() )
+ { // no threads at all -> execute the work in-line
+ ThreadTask *pTask;
+ while ( ( pTask = popWork() ) )
+ {
+ pTask->doWork();
+ delete pTask;
+ }
+ mbTerminate = true;
+ }
+ else
+ {
+ aGuard.clear();
+ maTasksEmpty.wait();
+ aGuard.reset();
+ }
+ assert( maTasks.empty() );
+}
+
+} // namespace comphelper
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */