summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatúš Kukan <matus.kukan@collabora.com>2013-11-20 17:09:41 +0100
committerMatúš Kukan <matus.kukan@collabora.com>2013-11-20 17:23:29 +0100
commited89a069f462ae106802e0d1376c38723c2c12cb (patch)
tree87cd707cd2ce69650dc488042aab5f87a3274ddd
parent32a621027f1a234a85b3659b93752a9263d8e860 (diff)
datastreams: read data in another thread
Change-Id: Iedd4075eadce9ca8fc41b279ea03c2679b01ec71
-rw-r--r--sc/source/ui/inc/datastreams.hxx15
-rw-r--r--sc/source/ui/miscdlgs/datastreams.cxx129
-rw-r--r--sc/source/ui/miscdlgs/datastreamsdlg.cxx7
3 files changed, 129 insertions, 22 deletions
diff --git a/sc/source/ui/inc/datastreams.hxx b/sc/source/ui/inc/datastreams.hxx
index 80f9cd651487..93d157483b23 100644
--- a/sc/source/ui/inc/datastreams.hxx
+++ b/sc/source/ui/inc/datastreams.hxx
@@ -14,23 +14,30 @@
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
+#include <vector>
-namespace datastreams { class CallerThread; }
+namespace datastreams {
+ class CallerThread;
+ class ReaderThread;
+}
class ScDocShell;
class ScDocument;
class ScRange;
class SvStream;
class Window;
+typedef std::vector<OString> LinesList;
+
class DataStreams : boost::noncopyable
{
public:
enum MoveEnum { NO_MOVE, RANGE_DOWN, MOVE_DOWN, MOVE_UP };
DataStreams(ScDocShell *pScDocShell);
~DataStreams();
+ OString ConsumeLine();
bool ImportData();
void MoveData();
- void Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine,
+ void Set(SvStream *pStream, bool bValuesInLine,
const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove);
void ShowDialog(Window *pParent);
void Start();
@@ -43,11 +50,13 @@ private:
bool mbRunning;
bool mbIsUndoEnabled;
bool mbValuesInLine;
+ LinesList *mpLines;
+ size_t mnLinesCount;
boost::scoped_ptr<ScRange> mpRange;
boost::scoped_ptr<ScRange> mpStartRange;
boost::scoped_ptr<ScRange> mpEndRange;
- boost::scoped_ptr<SvStream> mpStream;
rtl::Reference<datastreams::CallerThread> mxThread;
+ rtl::Reference<datastreams::ReaderThread> mxReaderThread;
};
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/sc/source/ui/miscdlgs/datastreams.cxx b/sc/source/ui/miscdlgs/datastreams.cxx
index b9385135a07d..9d66ee362ea2 100644
--- a/sc/source/ui/miscdlgs/datastreams.cxx
+++ b/sc/source/ui/miscdlgs/datastreams.cxx
@@ -24,6 +24,8 @@
#include <tabvwsh.hxx>
#include <viewdata.hxx>
+#include <queue>
+
namespace datastreams {
class CallerThread : public salhelper::Thread
@@ -57,6 +59,82 @@ private:
}
};
+class ReaderThread : public salhelper::Thread
+{
+ SvStream *mpStream;
+public:
+ bool mbTerminateReading;
+ osl::Condition maProduceResume;
+ osl::Condition maConsumeResume;
+ osl::Mutex maLinesProtector;
+ std::queue<LinesList* > maPendingLines;
+ std::queue<LinesList* > maUsedLines;
+
+ ReaderThread(SvStream *pData):
+ Thread("ReaderThread")
+ ,mpStream(pData)
+ ,mbTerminateReading(false)
+ {
+ }
+
+ virtual ~ReaderThread()
+ {
+ delete mpStream;
+ while (!maPendingLines.empty())
+ {
+ delete maPendingLines.front();
+ maPendingLines.pop();
+ }
+ while (!maUsedLines.empty())
+ {
+ delete maUsedLines.front();
+ maUsedLines.pop();
+ }
+ }
+
+ void terminate()
+ {
+ mbTerminateReading = true;
+ maProduceResume.set();
+ join();
+ }
+
+private:
+ virtual void execute() SAL_OVERRIDE
+ {
+ while (!mbTerminateReading)
+ {
+ LinesList *pLines = 0;
+ osl::ResettableMutexGuard aGuard(maLinesProtector);
+ if (!maUsedLines.empty())
+ {
+ pLines = maUsedLines.front();
+ maUsedLines.pop();
+ aGuard.clear(); // unlock
+ }
+ else
+ {
+ aGuard.clear(); // unlock
+ pLines = new LinesList(10);
+ }
+ for (size_t i = 0; i < pLines->size(); ++i)
+ mpStream->ReadLine( pLines->at(i) );
+ aGuard.reset(); // lock
+ while (!mbTerminateReading && maPendingLines.size() >= 8)
+ { // pause reading for a bit
+ aGuard.clear(); // unlock
+ maProduceResume.wait();
+ maProduceResume.reset();
+ aGuard.reset(); // lock
+ }
+ maPendingLines.push(pLines);
+ maConsumeResume.set();
+ if (!mpStream->good())
+ mbTerminateReading = true;
+ }
+ }
+};
+
}
DataStreams::DataStreams(ScDocShell *pScDocShell):
@@ -64,6 +142,8 @@ DataStreams::DataStreams(ScDocShell *pScDocShell):
, mpScDocument(mpScDocShell->GetDocument())
, meMove(NO_MOVE)
, mbRunning(false)
+ , mpLines(0)
+ , mnLinesCount(0)
{
mxThread = new datastreams::CallerThread( this );
mxThread->launch();
@@ -76,6 +156,31 @@ DataStreams::~DataStreams()
mxThread->mbTerminate = true;
mxThread->maStart.set();
mxThread->join();
+ if (mxReaderThread.is())
+ mxReaderThread->terminate();
+}
+
+OString DataStreams::ConsumeLine()
+{
+ if (!mpLines || mnLinesCount >= mpLines->size())
+ {
+ mnLinesCount = 0;
+ osl::ResettableMutexGuard aGuard(mxReaderThread->maLinesProtector);
+ if (mpLines)
+ mxReaderThread->maUsedLines.push(mpLines);
+ while (mxReaderThread->maPendingLines.empty())
+ {
+ aGuard.clear(); // unlock
+ mxReaderThread->maConsumeResume.wait();
+ mxReaderThread->maConsumeResume.reset();
+ aGuard.reset(); // lock
+ }
+ mpLines = mxReaderThread->maPendingLines.front();
+ mxReaderThread->maPendingLines.pop();
+ if (mxReaderThread->maPendingLines.size() <= 4)
+ mxReaderThread->maProduceResume.set(); // start producer again
+ }
+ return mpLines->at(mnLinesCount++);
}
void DataStreams::Start()
@@ -117,13 +222,13 @@ void DataStreams::Stop()
mpScDocument->EnableUndo(mbIsUndoEnabled);
}
-void DataStreams::Set(const OUString& rUrl, bool bIsScript, bool bValuesInLine,
+void DataStreams::Set(SvStream *pStream, bool bValuesInLine,
const OUString& rRange, sal_Int32 nLimit, MoveEnum eMove)
{
- if (bIsScript)
- mpStream.reset( new SvScriptStream(rUrl) );
- else
- mpStream.reset( new SvFileStream(rUrl, STREAM_READ) );
+ if (mxReaderThread.is())
+ mxReaderThread->terminate();
+ mxReaderThread = new datastreams::ReaderThread( pStream );
+ mxReaderThread->launch();
mpEndRange.reset( NULL );
mpRange.reset ( new ScRange() );
@@ -170,14 +275,6 @@ void DataStreams::MoveData()
bool DataStreams::ImportData()
{
- if (!mpStream->good())
- {
- // if there is a problem with SvStream, stop running
- mbRunning = false;
- return mbRunning;
- }
-
- OString sTmp;
SolarMutexGuard aGuard;
MoveData();
if (mbValuesInLine)
@@ -186,8 +283,7 @@ bool DataStreams::ImportData()
OStringBuffer aBuf;
while (nHeight--)
{
- mpStream->ReadLine(sTmp);
- aBuf.append(sTmp);
+ aBuf.append(ConsumeLine());
aBuf.append('\n');
}
SvMemoryStream aMemoryStream((void *)aBuf.getStr(), aBuf.getLength(), STREAM_READ);
@@ -202,8 +298,7 @@ bool DataStreams::ImportData()
// read more lines at once but not too much
for (int i = 0; i < 10; ++i)
{
- mpStream->ReadLine(sTmp);
- OUString sLine(OStringToOUString(sTmp, RTL_TEXTENCODING_UTF8));
+ OUString sLine( OStringToOUString(ConsumeLine(), RTL_TEXTENCODING_UTF8) );
if (sLine.indexOf(',') <= 0)
continue;
diff --git a/sc/source/ui/miscdlgs/datastreamsdlg.cxx b/sc/source/ui/miscdlgs/datastreamsdlg.cxx
index cbe19a3b4472..bacb67aeedd6 100644
--- a/sc/source/ui/miscdlgs/datastreamsdlg.cxx
+++ b/sc/source/ui/miscdlgs/datastreamsdlg.cxx
@@ -74,11 +74,14 @@ DataStreamsDlg::DataStreamsDlg(DataStreams *pDataStreams, Window* pParent)
void DataStreamsDlg::Start()
{
- bool bIsScript = m_pRBScriptData->IsChecked();
sal_Int32 nLimit = 0;
if (m_pRBMaxLimit->IsChecked())
nLimit = m_pEdLimit->GetText().toInt32();
- mpDataStreams->Set( m_pCbUrl->GetText(), bIsScript, m_pRBValuesInLine->IsChecked(),
+ mpDataStreams->Set(
+ (m_pRBScriptData->IsChecked() ?
+ dynamic_cast<SvStream*>( new SvScriptStream(m_pCbUrl->GetText()) ) :
+ dynamic_cast<SvStream*>( new SvFileStream(m_pCbUrl->GetText(), STREAM_READ) )),
+ m_pRBValuesInLine->IsChecked(),
m_pEdRange->GetText(), nLimit, (m_pRBNoMove->IsChecked() ? DataStreams::NO_MOVE :
m_pRBRangeDown->IsChecked() ? DataStreams::RANGE_DOWN : DataStreams::MOVE_DOWN) );
mpDataStreams->Start();