summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Meeks <michael.meeks@collabora.com>2013-11-27 18:11:34 +0000
committerMichael Meeks <michael.meeks@collabora.com>2013-11-27 19:19:50 +0000
commit0710299634a2276749c36ed86a5a60a20d63073f (patch)
tree6bcd379e905c902d50cb9a10b3fcc6d53d55f4dc
parent8bdf8dc075928bd0eb3e49ac0f51727c0677dc37 (diff)
sc: threaded parsing of the core data inside large XLSX files
Enabled in experimental mode only or via SC_IMPORT_THREADS=<N> this allows significant parallelisation of sheet reading. I also implement a simple thread pool to manage that. Change-Id: I66c72211f2699490230e993a374c26b1892eac12
-rw-r--r--sc/Library_scfilt.mk1
-rw-r--r--sc/source/filter/inc/sheetdatacontext.hxx11
-rw-r--r--sc/source/filter/oox/sheetdatacontext.cxx6
-rw-r--r--sc/source/filter/oox/threadpool.cxx162
-rw-r--r--sc/source/filter/oox/threadpool.hxx53
-rw-r--r--sc/source/filter/oox/workbookfragment.cxx212
6 files changed, 285 insertions, 160 deletions
diff --git a/sc/Library_scfilt.mk b/sc/Library_scfilt.mk
index 499f87326295..eb0d5d247036 100644
--- a/sc/Library_scfilt.mk
+++ b/sc/Library_scfilt.mk
@@ -211,6 +211,7 @@ $(eval $(call gb_Library_add_exception_objects,scfilt,\
sc/source/filter/oox/tablebuffer \
sc/source/filter/oox/tablefragment \
sc/source/filter/oox/themebuffer \
+ sc/source/filter/oox/threadpool \
sc/source/filter/oox/unitconverter \
sc/source/filter/oox/viewsettings \
sc/source/filter/oox/workbookfragment \
diff --git a/sc/source/filter/inc/sheetdatacontext.hxx b/sc/source/filter/inc/sheetdatacontext.hxx
index b492d2ab4ea7..3f3e377f5c54 100644
--- a/sc/source/filter/inc/sheetdatacontext.hxx
+++ b/sc/source/filter/inc/sheetdatacontext.hxx
@@ -23,6 +23,9 @@
#include "excelhandlers.hxx"
#include "richstring.hxx"
#include "sheetdatabuffer.hxx"
+#include <vcl/svapp.hxx>
+
+#define MULTI_THREAD_SHEET_PARSING 1
namespace oox {
namespace xls {
@@ -54,8 +57,16 @@ struct SheetDataContextBase
*/
class SheetDataContext : public WorksheetContextBase, private SheetDataContextBase
{
+ // If we are doing threaded parsing, this SheetDataContext
+ // forms the inner loop for bulk data parsing, and for the
+ // duration of this we can drop the solar mutex.
+#if MULTI_THREAD_SHEET_PARSING
+ SolarMutexReleaser aReleaser;
+#endif
+
public:
explicit SheetDataContext( WorksheetFragmentBase& rFragment );
+ virtual ~SheetDataContext();
protected:
virtual ::oox::core::ContextHandlerRef onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs );
diff --git a/sc/source/filter/oox/sheetdatacontext.cxx b/sc/source/filter/oox/sheetdatacontext.cxx
index 5170234158ea..9a0f7dfd83f6 100644
--- a/sc/source/filter/oox/sheetdatacontext.cxx
+++ b/sc/source/filter/oox/sheetdatacontext.cxx
@@ -90,6 +90,12 @@ SheetDataContext::SheetDataContext( WorksheetFragmentBase& rFragment ) :
mnRow( -1 ),
mnCol( -1 )
{
+ SAL_INFO( "sc.filter", "start safe sheet data context - unlock\n" );
+}
+
+SheetDataContext::~SheetDataContext()
+{
+ SAL_INFO( "sc.filter", "end safe sheet data context - relock\n" );
}
ContextHandlerRef SheetDataContext::onCreateContext( sal_Int32 nElement, const AttributeList& rAttribs )
diff --git a/sc/source/filter/oox/threadpool.cxx b/sc/source/filter/oox/threadpool.cxx
new file mode 100644
index 000000000000..9de1a1454ceb
--- /dev/null
+++ b/sc/source/filter/oox/threadpool.cxx
@@ -0,0 +1,162 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#include "threadpool.hxx"
+
+class ThreadPool::ThreadWorker : public salhelper::Thread
+{
+ ThreadPool *mpPool;
+ osl::Condition maNewWork;
+public:
+ ThreadWorker( ThreadPool *pPool ) :
+ salhelper::Thread("sheet-import-thread-pool"),
+ mpPool( pPool ) {}
+
+ virtual void execute()
+ {
+ ThreadTask *pTask;
+ while ( ( pTask = waitForWork() ) )
+ {
+ pTask->doWork();
+ delete pTask;
+ }
+ }
+
+ ThreadTask *waitForWork()
+ {
+ ThreadTask *pRet = NULL;
+
+ osl::ResettableMutexGuard aGuard( mpPool->maGuard );
+
+ pRet = mpPool->popWork();
+
+ while( !pRet )
+ {
+ maNewWork.reset();
+
+ if( mpPool->mbTerminate )
+ break;
+
+ aGuard.clear(); // unlock
+
+ maNewWork.wait();
+
+ aGuard.reset(); // lock
+
+ pRet = mpPool->popWork();
+ }
+
+ return pRet;
+ }
+
+ //
+ // Why a condition per worker thread - you may ask.
+ //
+ // Unfortunately the Windows synchronisation API that we wrap
+ // is horribly inadequate cf.
+ // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
+ // The existing osl::Condition API should only ever be used
+ // between one producer and one consumer thread to avoid the
+ // lost wakeup problem.
+ //
+ void signalNewWork()
+ {
+ maNewWork.set();
+ }
+};
+
+ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
+ mbTerminate( false )
+{
+ for( sal_Int32 i = 0; i < nWorkers; i++ )
+ maWorkers.push_back( new ThreadWorker( this ) );
+
+ maTasksEmpty.reset();
+
+ osl::MutexGuard aGuard( maGuard );
+ for( size_t i = 0; i < maWorkers.size(); i++ )
+ maWorkers[ i ]->launch();
+}
+
+ThreadPool::~ThreadPool()
+{
+ waitUntilWorkersDone();
+}
+
+/// wait until all the workers have completed and
+/// terminate all threads
+void ThreadPool::waitUntilWorkersDone()
+{
+ waitUntilEmpty();
+
+ osl::ResettableMutexGuard aGuard( maGuard );
+ mbTerminate = true;
+
+ while( !maWorkers.empty() )
+ {
+ rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
+ maWorkers.pop_back();
+ assert( maWorkers.find( xWorker ) == maWorkers.end() );
+ xWorker->signalNewWork();
+ aGuard.clear();
+ { // unlocked
+ xWorker->join();
+ xWorker.clear();
+ }
+ aGuard.reset();
+ }
+}
+
+void ThreadPool::pushTask( ThreadTask *pTask )
+{
+ osl::MutexGuard aGuard( maGuard );
+ maTasks.insert( maTasks.begin(), pTask );
+ // horrible beyond belief:
+ for( size_t i = 0; i < maWorkers.size(); i++ )
+ maWorkers[ i ]->signalNewWork();
+ maTasksEmpty.reset();
+}
+
+ThreadTask *ThreadPool::popWork()
+{
+ if( !maTasks.empty() )
+ {
+ ThreadTask *pTask = maTasks.back();
+ maTasks.pop_back();
+ return pTask;
+ }
+ else
+ maTasksEmpty.set();
+ return NULL;
+}
+
+void ThreadPool::waitUntilEmpty()
+{
+ osl::ResettableMutexGuard aGuard( maGuard );
+
+ if( maWorkers.empty() )
+ { // no threads at all -> execute the work in-line
+ ThreadTask *pTask;
+ while ( ( pTask = popWork() ) )
+ {
+ pTask->doWork();
+ delete pTask;
+ }
+ mbTerminate = true;
+ }
+ else
+ {
+ aGuard.clear();
+ maTasksEmpty.wait();
+ aGuard.reset();
+ }
+ assert( maTasks.empty() );
+}
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/sc/source/filter/oox/threadpool.hxx b/sc/source/filter/oox/threadpool.hxx
new file mode 100644
index 000000000000..036534f6ff9d
--- /dev/null
+++ b/sc/source/filter/oox/threadpool.hxx
@@ -0,0 +1,53 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
+/*
+ * This file is part of the LibreOffice project.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+#ifndef SC_THREADPOOL_HXX
+#define SC_THREADPOOL_HXX
+
+#include <sal/config.h>
+#include <salhelper/thread.hxx>
+#include <osl/mutex.hxx>
+#include <osl/conditn.hxx>
+#include <rtl/ref.hxx>
+#include <vector>
+
+class ThreadTask
+{
+public:
+ virtual ~ThreadTask() {}
+ virtual void doWork() = 0;
+};
+
+/// A very basic thread pool implementation
+class ThreadPool
+{
+public:
+ ThreadPool( sal_Int32 nWorkers );
+ virtual ~ThreadPool();
+ void pushTask( ThreadTask *pTask /* takes ownership */ );
+ void waitUntilEmpty();
+ void waitUntilWorkersDone();
+
+private:
+ class ThreadWorker;
+ friend class ThreadWorker;
+
+ ThreadTask *waitForWork( osl::Condition &rNewWork );
+ ThreadTask *popWork();
+
+ osl::Mutex maGuard;
+ osl::Condition maTasksEmpty;
+ bool mbTerminate;
+ std::vector< rtl::Reference< ThreadWorker > > maWorkers;
+ std::vector< ThreadTask * > maTasks;
+};
+
+#endif // SC_THREADPOOL_HXX
+
+/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/sc/source/filter/oox/workbookfragment.cxx b/sc/source/filter/oox/workbookfragment.cxx
index 7acb16ae4699..d3c06ac90c73 100644
--- a/sc/source/filter/oox/workbookfragment.cxx
+++ b/sc/source/filter/oox/workbookfragment.cxx
@@ -42,11 +42,16 @@
#include "workbooksettings.hxx"
#include "worksheetbuffer.hxx"
#include "worksheetfragment.hxx"
+#include "sheetdatacontext.hxx"
+#include "threadpool.hxx"
+#include "officecfg/Office/Common.hxx"
#include "document.hxx"
#include "docsh.hxx"
#include "calcconfig.hxx"
+#include <vcl/svapp.hxx>
+
#include <oox/core/fastparser.hxx>
#include <salhelper/thread.hxx>
#include <osl/conditn.hxx>
@@ -54,8 +59,6 @@
#include <queue>
#include <boost/scoped_ptr.hpp>
-#define MULTI_THREAD_SHEET_PARSING 0
-
#include "oox/ole/vbaproject.hxx"
namespace oox {
@@ -204,188 +207,77 @@ namespace {
typedef std::pair<WorksheetGlobalsRef, FragmentHandlerRef> SheetFragmentHandler;
typedef std::vector<SheetFragmentHandler> SheetFragmentVector;
-#if MULTI_THREAD_SHEET_PARSING
-
-class WorkerThread;
-typedef rtl::Reference<WorkerThread> WorkerThreadRef;
-
-struct WorkerThreadData
-{
- osl::Mutex maMtx;
- std::vector<WorkerThreadRef> maThreads;
-};
-
-struct IdleWorkerThreadData
-{
- osl::Mutex maMtx;
- osl::Condition maCondAdded;
- std::queue<WorkerThread*> maThreads;
-};
-
-struct
-{
- boost::scoped_ptr<WorkerThreadData> mpWorkerThreads;
- boost::scoped_ptr<IdleWorkerThreadData> mpIdleThreads;
-
-} aThreadGlobals;
-
-enum WorkerAction
-{
- None = 0,
- TerminateThread,
- Work
-};
-
-class WorkerThread : public salhelper::Thread
+class WorkerThread : public ThreadTask
{
WorkbookFragment& mrWorkbookHandler;
- size_t mnID;
- FragmentHandlerRef mxHandler;
- boost::scoped_ptr<oox::core::FastParser> mxParser;
- osl::Mutex maMtxAction;
- osl::Condition maCondActionChanged;
- WorkerAction meAction;
-public:
- WorkerThread( WorkbookFragment& rWorkbookHandler, size_t nID ) :
- salhelper::Thread("sheet-import-worker-thread"),
- mrWorkbookHandler(rWorkbookHandler),
- mnID(nID),
- mxParser(rWorkbookHandler.getOoxFilter().createParser()),
- meAction(None) {}
-
- virtual void execute()
- {
- announceIdle();
-
- // Keep looping until the terminate request is set.
- for (maCondActionChanged.wait(); true; maCondActionChanged.wait())
- {
- osl::MutexGuard aGuard(maMtxAction);
- if (!maCondActionChanged.check())
- // Wait again.
- continue;
-
- maCondActionChanged.reset();
-
- if (meAction == TerminateThread)
- // End the thread.
- return;
-
- if (meAction != Work)
- continue;
-
-#if 0
- // TODO : This still deadlocks in the fast parser code.
- mrWorkbookHandler.importOoxFragment(mxHandler, *mxParser);
-#else
- double val = rand() / static_cast<double>(RAND_MAX);
- val *= 1000000; // normalize to 1 second.
- val *= 1.5; // inflate it a bit.
- usleep(val); // pretend to be working while asleep.
-#endif
- announceIdle();
- }
- }
-
- void announceIdle()
- {
- // Set itself idle to receive a new task from the main thread.
- osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx);
- aThreadGlobals.mpIdleThreads->maThreads.push(this);
- aThreadGlobals.mpIdleThreads->maCondAdded.set();
- }
+ rtl::Reference<FragmentHandler> mxHandler;
- void terminate()
+public:
+ WorkerThread( WorkbookFragment& rWorkbookHandler,
+ const rtl::Reference<FragmentHandler>& xHandler ) :
+ mrWorkbookHandler( rWorkbookHandler ),
+ mxHandler( xHandler )
{
- osl::MutexGuard aGuard(maMtxAction);
- meAction = TerminateThread;
- maCondActionChanged.set();
}
- void assign( const FragmentHandlerRef& rHandler )
+ virtual void doWork()
{
- osl::MutexGuard aGuard(maMtxAction);
- mxHandler = rHandler;
- meAction = Work;
- maCondActionChanged.set();
+ // We hold the solar mutex in all threads except for
+ // the small safe section of the inner loop in
+ // sheetdatacontext.cxx
+ SAL_INFO( "sc.filter", "start wait on solar\n" );
+ SolarMutexGuard maGuard;
+ SAL_INFO( "sc.filter", "got solar\n" );
+
+ boost::scoped_ptr<oox::core::FastParser> xParser(
+ mrWorkbookHandler.getOoxFilter().createParser() );
+
+ SAL_INFO( "sc.filter", "start import\n" );
+ mrWorkbookHandler.importOoxFragment( mxHandler, *xParser );
+ SAL_INFO( "sc.filter", "end import, release solar\n" );
}
};
-#endif
-
void importSheetFragments( WorkbookFragment& rWorkbookHandler, SheetFragmentVector& rSheets )
{
-#if MULTI_THREAD_SHEET_PARSING // threaded version
- size_t nThreadCount = 3;
- if (nThreadCount > rSheets.size())
- nThreadCount = rSheets.size();
+ sal_Int32 nThreads = std::min( rSheets.size(), (size_t) 4 /* FIXME: ncpus/2 */ );
- // Create new thread globals.
- aThreadGlobals.mpWorkerThreads.reset(new WorkerThreadData);
- aThreadGlobals.mpIdleThreads.reset(new IdleWorkerThreadData);
+ Reference< XComponentContext > xContext = comphelper::getProcessComponentContext();
- SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
+ // Force threading off unless experimental mode or env. var is set.
+ if( !officecfg::Office::Common::Misc::ExperimentalMode::get( xContext ) )
+ nThreads = 0;
- {
- // Initialize worker threads.
- osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx);
- for (size_t i = 0; i < nThreadCount; ++i)
- {
- WorkerThreadRef pThread(new WorkerThread(rWorkbookHandler, i));
- aThreadGlobals.mpWorkerThreads->maThreads.push_back(pThread);
- pThread->launch();
- }
- }
+ const char *pEnv;
+ if( ( pEnv = getenv( "SC_IMPORT_THREADS" ) ) )
+ nThreads = rtl_str_toInt32( pEnv, 10 );
- for (aThreadGlobals.mpIdleThreads->maCondAdded.wait(); true; aThreadGlobals.mpIdleThreads->maCondAdded.wait())
+ if( nThreads != 0 )
{
- osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx);
- if (!aThreadGlobals.mpIdleThreads->maCondAdded.check())
- // Wait again.
- continue;
-
- aThreadGlobals.mpIdleThreads->maCondAdded.reset();
-
- // Assign work to all idle threads.
- while (!aThreadGlobals.mpIdleThreads->maThreads.empty())
- {
- if (it == itEnd)
- break;
-
- WorkerThread* p = aThreadGlobals.mpIdleThreads->maThreads.front();
- aThreadGlobals.mpIdleThreads->maThreads.pop();
- p->assign(it->second);
- ++it;
- }
+ // test sequential read in this mode
+ if( nThreads < 0)
+ nThreads = 0;
+ ThreadPool aPool( nThreads );
- if (it == itEnd)
- // Finished! Exit the loop.
- break;
- }
+ SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
+ for( ; it != itEnd; ++it )
+ aPool.pushTask( new WorkerThread( rWorkbookHandler, it->second ) )
+ ;
- {
- // Terminate all worker threads.
- osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx);
- for (size_t i = 0, n = aThreadGlobals.mpWorkerThreads->maThreads.size(); i < n; ++i)
{
- WorkerThreadRef pWorker = aThreadGlobals.mpWorkerThreads->maThreads[i];
- pWorker->terminate();
- if (pWorker.is())
- pWorker->join();
+ // Ideally no-one else but our worker threads can re-acquire that.
+ // potentially if that causes a problem we might want to extend
+ // the SolarMutex functionality to allow passing it around.
+ SolarMutexReleaser aReleaser;
+ aPool.waitUntilWorkersDone();
}
}
-
- // Delete all thread globals.
- aThreadGlobals.mpWorkerThreads.reset();
- aThreadGlobals.mpIdleThreads.reset();
-
-#else // non-threaded version
- for( SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); it != itEnd; ++it)
+ else
{
- // import the sheet fragment
- rWorkbookHandler.importOoxFragment(it->second);
+ SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
+ for( ; it != itEnd; ++it )
+ rWorkbookHandler.importOoxFragment( it->second );
}
-#endif
}
}