summaryrefslogtreecommitdiff
path: root/comphelper
diff options
context:
space:
mode:
authorLuboš Luňák <l.lunak@collabora.com>2020-10-05 12:14:34 +0200
committerLuboš Luňák <l.lunak@collabora.com>2020-10-05 19:36:16 +0200
commit2ad4e77a0f266ae6e6fccaebb1d080d2880bdac3 (patch)
tree8d0f004997fb2f65cbfdd81a4e5bd5e5fa380927 /comphelper
parenta4dbf43bf505980e6acc587aa13096c215aa99e9 (diff)
fix allocating thread pool workers
Tasks are removed from the queue before a worker starts working on it, which means that maTasks.size() is not the number of tasks to do, because the worked on tasks are not included there. This means the code could spawn only a smaller number of workers than were needed (and than CPU cores that are available). Change-Id: Ic6e6a79316cf48d82f2b80be7ad477b723b2c4e5 Reviewed-on: https://gerrit.libreoffice.org/c/core/+/103955 Tested-by: Jenkins Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
Diffstat (limited to 'comphelper')
-rw-r--r--comphelper/CppunitTest_comphelper_threadpool_test.mk1
-rw-r--r--comphelper/qa/unit/threadpooltest.cxx50
-rw-r--r--comphelper/source/misc/threadpool.cxx24
3 files changed, 73 insertions, 2 deletions
diff --git a/comphelper/CppunitTest_comphelper_threadpool_test.mk b/comphelper/CppunitTest_comphelper_threadpool_test.mk
index 16bbd6fff69b..24467c898f80 100644
--- a/comphelper/CppunitTest_comphelper_threadpool_test.mk
+++ b/comphelper/CppunitTest_comphelper_threadpool_test.mk
@@ -24,6 +24,7 @@ $(eval $(call gb_CppunitTest_use_libraries,comphelper_threadpool_test, \
cppuhelper \
cppu \
sal \
+ tl \
))
# vim: set noet sw=4 ts=4:
diff --git a/comphelper/qa/unit/threadpooltest.cxx b/comphelper/qa/unit/threadpooltest.cxx
index 10fb90c3014f..03bd4a33d69c 100644
--- a/comphelper/qa/unit/threadpooltest.cxx
+++ b/comphelper/qa/unit/threadpooltest.cxx
@@ -12,17 +12,22 @@
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/plugin/TestPlugIn.h>
+#include <tools/time.hxx>
+#include <osl/thread.hxx>
#include <stdlib.h>
#include <thread>
+#include <mutex>
class ThreadPoolTest : public CppUnit::TestFixture
{
public:
void testPreferredConcurrency();
+ void testWorkerUsage();
CPPUNIT_TEST_SUITE(ThreadPoolTest);
CPPUNIT_TEST(testPreferredConcurrency);
+ CPPUNIT_TEST(testWorkerUsage);
CPPUNIT_TEST_SUITE_END();
};
@@ -48,6 +53,51 @@ void ThreadPoolTest::testPreferredConcurrency()
#endif
}
+namespace
+{
+class UsageTask : public comphelper::ThreadTask
+{
+public:
+ UsageTask(const std::shared_ptr<comphelper::ThreadTaskTag>& pTag)
+ : ThreadTask(pTag)
+ {
+ }
+ virtual void doWork()
+ {
+ ++count;
+ mutex.lock();
+ mutex.unlock();
+ }
+ static inline int count = 0;
+ static inline std::mutex mutex;
+};
+} // namespace
+
+void ThreadPoolTest::testWorkerUsage()
+{
+ // Create tasks for each available worker. Lock a shared mutex before that to make all
+ // tasks block on it. And check that all workers have started, i.e. that the full
+ // thread pool capacity is used.
+ comphelper::ThreadPool& rSharedPool = comphelper::ThreadPool::getSharedOptimalPool();
+ std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
+ UsageTask::mutex.lock();
+ for (int i = 0; i < rSharedPool.getWorkerCount(); ++i)
+ {
+ rSharedPool.pushTask(std::make_unique<UsageTask>(pTag));
+ osl::Thread::wait(std::chrono::milliseconds(10)); // give it a time to start
+ }
+ sal_uInt64 startTicks = tools::Time::GetSystemTicks();
+ while (UsageTask::count != rSharedPool.getWorkerCount())
+ {
+ // Wait at most 5 seconds, that should do even on slow systems.
+ CPPUNIT_ASSERT_MESSAGE("Thread pool does not use all worker threads.",
+ startTicks + 5000 > tools::Time::GetSystemTicks());
+ osl::Thread::wait(std::chrono::milliseconds(10));
+ }
+ UsageTask::mutex.unlock();
+ rSharedPool.waitUntilDone(pTag);
+}
+
CPPUNIT_TEST_SUITE_REGISTRATION(ThreadPoolTest);
CPPUNIT_PLUGIN_IMPLEMENT();
diff --git a/comphelper/source/misc/threadpool.cxx b/comphelper/source/misc/threadpool.cxx
index f93400d96f9f..906189202cdd 100644
--- a/comphelper/source/misc/threadpool.cxx
+++ b/comphelper/source/misc/threadpool.cxx
@@ -79,12 +79,14 @@ public:
std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
if( pTask )
{
+ mpPool->incBusyWorker();
aGuard.unlock();
pTask->exec();
pTask.reset();
aGuard.lock();
+ mpPool->decBusyWorker();
}
}
}
@@ -92,7 +94,8 @@ public:
ThreadPool::ThreadPool(sal_Int32 nWorkers)
: mbTerminate(true)
- , mnWorkers(nWorkers)
+ , mnMaxWorkers(nWorkers)
+ , mnBusyWorkers(0)
{
}
@@ -104,6 +107,7 @@ ThreadPool::~ThreadPool()
// still 0, but hopefully they will be more helpful on non-WNT platforms
assert(mbTerminate);
assert(maTasks.empty());
+ assert(mnBusyWorkers == 0);
}
namespace {
@@ -198,7 +202,8 @@ void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
mbTerminate = false;
- if (maWorkers.size() < mnWorkers && maWorkers.size() <= maTasks.size())
+ // Worked on tasks are already removed from maTasks, so include the count of busy workers.
+ if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
{
maWorkers.push_back( new ThreadWorker( this ) );
maWorkers.back()->launch();
@@ -230,6 +235,17 @@ std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mu
return nullptr;
}
+void ThreadPool::incBusyWorker()
+{
+ ++mnBusyWorkers;
+}
+
+void ThreadPool::decBusyWorker()
+{
+ --mnBusyWorkers;
+ assert(mnBusyWorkers >= 0);
+}
+
void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoinAll)
{
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
@@ -294,6 +310,10 @@ void ThreadTask::exec()
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
}
+ catch (...)
+ {
+ SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
+ }
pTag->onTaskWorkerDone();
}