summaryrefslogtreecommitdiff
path: root/package
diff options
context:
space:
mode:
authorDennis Francis <dennis.francis@collabora.com>2020-01-11 11:51:34 +0530
committerDennis Francis <dennis.francis@collabora.com>2020-01-13 12:11:44 +0100
commit353d4528b8ad8abca9a13f3016632e42bab7afde (patch)
treea83950338ad79e77594638d9dd60bd073fdd2115 /package
parent9dce33e6943dec5ff111802ec3e7c338abf56592 (diff)
tdf#125662: do parallel-zip in batches
In this approach the input stream is read one batch (of constant size) at a time and each batch is compressed by ThreadedDeflater. After we are done with a batch, the deflated buffer is processed straightaway (directed to file backed storage). Change-Id: I2d42f86cf5898e4d746836d94bf6009a8d3b0230 Reviewed-on: https://gerrit.libreoffice.org/c/core/+/86596 Tested-by: Jenkins Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
Diffstat (limited to 'package')
-rw-r--r--package/inc/ThreadedDeflater.hxx29
-rw-r--r--package/source/zipapi/ThreadedDeflater.cxx118
-rw-r--r--package/source/zipapi/ZipOutputEntry.cxx32
3 files changed, 109 insertions, 70 deletions
diff --git a/package/inc/ThreadedDeflater.hxx b/package/inc/ThreadedDeflater.hxx
index 3bd7e4bc966a..f22a40a0c941 100644
--- a/package/inc/ThreadedDeflater.hxx
+++ b/package/inc/ThreadedDeflater.hxx
@@ -21,37 +21,48 @@
#define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX
#include <com/sun/star/uno/Sequence.hxx>
+#include <com/sun/star/io/XInputStream.hpp>
+#include <com/sun/star/uno/Reference.hxx>
#include <package/packagedllapi.hxx>
#include <comphelper/threadpool.hxx>
#include <atomic>
#include <memory>
+#include <vector>
+#include <functional>
namespace ZipUtils
{
/// Parallel compression a stream using the libz deflate algorithm.
///
-/// Almost a replacement for the Deflater class. Call startDeflate() with the data,
-/// check with finished() or waitForTasks() and retrieve result with getOutput().
-/// The class will internally split into multiple threads.
+/// Call deflateWrite() with the input stream and input/output processing functions.
+/// This will use multiple threads for compression on each batch of data from the stream.
class ThreadedDeflater final
{
class Task;
// Note: All this should be lock-less. Each task writes only to its part
- // of the data, flags are atomic.
+ // of the data.
std::vector<std::vector<sal_Int8>> outBuffers;
std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag;
css::uno::Sequence<sal_Int8> inBuffer;
+ css::uno::Sequence<sal_Int8> prevDataBlock;
+ std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> maProcessOutputFunc;
+ sal_Int64 totalIn;
+ sal_Int64 totalOut;
int zlibLevel;
- std::atomic<int> pendingTasksCount;
public:
// Unlike with Deflater class, bNoWrap is always true.
ThreadedDeflater(sal_Int32 nSetLevel);
~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE;
- void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer);
- void waitForTasks();
- bool finished() const;
- css::uno::Sequence<sal_Int8> getOutput() const;
+ void deflateWrite(
+ const css::uno::Reference<css::io::XInputStream>& xInStream,
+ std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
+ std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc);
+ sal_Int64 getTotalIn() const { return totalIn; }
+ sal_Int64 getTotalOut() const { return totalOut; }
+
+private:
+ void processDeflatedBuffers();
void clear();
};
diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx
index 19bbda01bbb7..73725c580c02 100644
--- a/package/source/zipapi/ThreadedDeflater.cxx
+++ b/package/source/zipapi/ThreadedDeflater.cxx
@@ -44,14 +44,19 @@ class ThreadedDeflater::Task : public comphelper::ThreadTask
ThreadedDeflater* deflater;
int sequence;
int blockSize;
+ bool firstTask : 1;
+ bool lastTask : 1;
public:
- Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
+ Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_, bool firstTask_,
+ bool lastTask_)
: comphelper::ThreadTask(deflater_->threadTaskTag)
, stream()
, deflater(deflater_)
, sequence(sequence_)
, blockSize(blockSize_)
+ , firstTask(firstTask_)
+ , lastTask(lastTask_)
{
}
@@ -61,58 +66,83 @@ private:
ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
: threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
+ , totalIn(0)
+ , totalOut(0)
, zlibLevel(nSetLevel)
- , pendingTasksCount(0)
{
}
-ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE
-{
- waitForTasks();
- clear();
-}
+ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE { clear(); }
-void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
+void ThreadedDeflater::deflateWrite(
+ const css::uno::Reference<css::io::XInputStream>& xInStream,
+ std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
+ std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc)
{
- inBuffer = rBuffer;
- sal_Int64 size = inBuffer.getLength();
- int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
- tasksCount = std::max(tasksCount, 1);
- pendingTasksCount = tasksCount;
- outBuffers.resize(pendingTasksCount);
- for (int sequence = 0; sequence < tasksCount; ++sequence)
+ sal_Int64 nThreadCount = comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount();
+ sal_Int64 batchSize = MaxBlockSize * nThreadCount;
+ inBuffer.realloc(batchSize);
+ prevDataBlock.realloc(MaxBlockSize);
+ outBuffers.resize(nThreadCount);
+ maProcessOutputFunc = aProcessOutputFunc;
+ bool firstTask = true;
+
+ while (xInStream->available() > 0)
{
- sal_Int64 thisSize = std::min(MaxBlockSize, size);
- size -= thisSize;
- comphelper::ThreadPool::getSharedOptimalPool().pushTask(
- std::make_unique<Task>(this, sequence, thisSize));
+ sal_Int64 inputBytes = xInStream->readBytes(inBuffer, batchSize);
+ aProcessInputFunc(inBuffer, inputBytes);
+ totalIn += inputBytes;
+ int sequence = 0;
+ bool lastBatch = xInStream->available() <= 0;
+ sal_Int64 bytesPending = inputBytes;
+ while (bytesPending > 0)
+ {
+ sal_Int64 taskSize = std::min(MaxBlockSize, bytesPending);
+ bytesPending -= taskSize;
+ bool lastTask = lastBatch && !bytesPending;
+ comphelper::ThreadPool::getSharedOptimalPool().pushTask(
+ std::make_unique<Task>(this, sequence++, taskSize, firstTask, lastTask));
+
+ if (firstTask)
+ firstTask = false;
+ }
+
+ assert(bytesPending == 0);
+
+ comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+
+ if (!lastBatch)
+ {
+ assert(inputBytes == batchSize);
+ std::copy_n(inBuffer.begin() + (batchSize - MaxBlockSize), MaxBlockSize,
+ prevDataBlock.begin());
+ }
+
+ processDeflatedBuffers();
}
- assert(size == 0);
}
-bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }
-
-css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
+void ThreadedDeflater::processDeflatedBuffers()
{
- assert(finished());
- sal_Int64 totalSize = 0;
+ sal_Int64 batchOutputSize = 0;
for (const auto& buffer : outBuffers)
- totalSize += buffer.size();
- uno::Sequence<sal_Int8> outBuffer(totalSize);
+ batchOutputSize += buffer.size();
+
+ css::uno::Sequence<sal_Int8> outBuffer(batchOutputSize);
+
auto pos = outBuffer.begin();
- for (const auto& buffer : outBuffers)
+ for (auto& buffer : outBuffers)
+ {
pos = std::copy(buffer.begin(), buffer.end(), pos);
- return outBuffer;
-}
+ buffer.clear();
+ }
-void ThreadedDeflater::waitForTasks()
-{
- comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
+ maProcessOutputFunc(outBuffer, batchOutputSize);
+ totalOut += batchOutputSize;
}
void ThreadedDeflater::clear()
{
- assert(finished());
inBuffer = uno::Sequence<sal_Int8>();
outBuffers.clear();
}
@@ -147,27 +177,35 @@ void ThreadedDeflater::Task::doWork()
// zlib doesn't handle const properly
unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
const_cast<signed char*>(deflater->inBuffer.getConstArray()));
- if (sequence != 0)
+ if (!firstTask)
{
// the window size is 32k, so set last 32k of previous data as the dictionary
assert(MAX_WBITS == 15);
assert(MaxBlockSize >= 32768);
- deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+ if (sequence > 0)
+ {
+ deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
+ }
+ else
+ {
+ unsigned char* prevBufferPtr = reinterpret_cast<unsigned char*>(
+ const_cast<signed char*>(deflater->prevDataBlock.getConstArray()));
+ deflateSetDictionary(&stream, prevBufferPtr + MaxBlockSize - 32768, 32768);
+ }
}
stream.next_in = inBufferPtr + myInBufferStart;
stream.avail_in = blockSize;
stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
stream.avail_out = outputMaxSize;
- bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?
+
// The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
// and since we use a raw stream, the data blocks then can be simply concatenated.
- int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
+ int res = deflate(&stream, lastTask ? Z_FINISH : Z_SYNC_FLUSH);
assert(stream.avail_in == 0); // Check that everything has been deflated.
- if (last ? res == Z_STREAM_END : res == Z_OK)
+ if (lastTask ? res == Z_STREAM_END : res == Z_OK)
{ // ok
sal_Int64 outSize = outputMaxSize - stream.avail_out;
deflater->outBuffers[sequence].resize(outSize);
- --deflater->pendingTasksCount;
}
else
{
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index bee9d0aeb70c..f08e687c43a4 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -363,28 +363,18 @@ ZipOutputEntryParallel::ZipOutputEntryParallel(
void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
{
- sal_Int64 toRead = xInStream->available();
- uno::Sequence< sal_Int8 > inBuffer( toRead );
- sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
- if (read < toRead)
- inBuffer.realloc( read );
- while( xInStream->available() > 0 )
- { // We didn't get the full size from available().
- uno::Sequence< sal_Int8 > buf( xInStream->available());
- read = xInStream->readBytes( buf, xInStream->available());
- sal_Int64 oldSize = inBuffer.getLength();
- inBuffer.realloc( oldSize + read );
- std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
- }
ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
- totalIn = inBuffer.getLength();
- deflater.startDeflate( inBuffer );
- processInput( inBuffer );
- deflater.waitForTasks();
- uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
- deflater.clear(); // release memory
- totalOut = outBuffer.getLength();
- processDeflated(outBuffer, outBuffer.getLength());
+ deflater.deflateWrite(xInStream,
+ [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
+ if (!m_bEncryptCurrentEntry)
+ m_aCRC.updateSegment(rBuffer, nLen);
+ },
+ [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
+ processDeflated(rBuffer, nLen);
+ }
+ );
+ totalIn = deflater.getTotalIn();
+ totalOut = deflater.getTotalOut();
closeEntry();
}