From 353d4528b8ad8abca9a13f3016632e42bab7afde Mon Sep 17 00:00:00 2001 From: Dennis Francis Date: Sat, 11 Jan 2020 11:51:34 +0530 Subject: tdf#125662: do parallel-zip in batches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this approach the input stream is read one batch (of constant size) at a time and each batch is compressed by ThreadedDeflater. After we are done with a batch, the deflated buffer is processed straightaway (directed to file backed storage). Change-Id: I2d42f86cf5898e4d746836d94bf6009a8d3b0230 Reviewed-on: https://gerrit.libreoffice.org/c/core/+/86596 Tested-by: Jenkins Reviewed-by: Luboš Luňák --- package/inc/ThreadedDeflater.hxx | 29 ++++--- package/source/zipapi/ThreadedDeflater.cxx | 118 +++++++++++++++++++---------- package/source/zipapi/ZipOutputEntry.cxx | 32 +++----- 3 files changed, 109 insertions(+), 70 deletions(-) diff --git a/package/inc/ThreadedDeflater.hxx b/package/inc/ThreadedDeflater.hxx index 3bd7e4bc966a..f22a40a0c941 100644 --- a/package/inc/ThreadedDeflater.hxx +++ b/package/inc/ThreadedDeflater.hxx @@ -21,37 +21,48 @@ #define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX #include +#include +#include #include #include #include #include +#include +#include namespace ZipUtils { /// Parallel compression a stream using the libz deflate algorithm. /// -/// Almost a replacement for the Deflater class. Call startDeflate() with the data, -/// check with finished() or waitForTasks() and retrieve result with getOutput(). -/// The class will internally split into multiple threads. +/// Call deflateWrite() with the input stream and input/output processing functions. +/// This will use multiple threads for compression on each batch of data from the stream. class ThreadedDeflater final { class Task; // Note: All this should be lock-less. Each task writes only to its part - // of the data, flags are atomic. + // of the data. std::vector> outBuffers; std::shared_ptr threadTaskTag; css::uno::Sequence inBuffer; + css::uno::Sequence prevDataBlock; + std::function&, sal_Int32)> maProcessOutputFunc; + sal_Int64 totalIn; + sal_Int64 totalOut; int zlibLevel; - std::atomic pendingTasksCount; public: // Unlike with Deflater class, bNoWrap is always true. ThreadedDeflater(sal_Int32 nSetLevel); ~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE; - void startDeflate(const css::uno::Sequence& rBuffer); - void waitForTasks(); - bool finished() const; - css::uno::Sequence getOutput() const; + void deflateWrite( + const css::uno::Reference& xInStream, + std::function&, sal_Int32)> aProcessInputFunc, + std::function&, sal_Int32)> aProcessOutputFunc); + sal_Int64 getTotalIn() const { return totalIn; } + sal_Int64 getTotalOut() const { return totalOut; } + +private: + void processDeflatedBuffers(); void clear(); }; diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx index 19bbda01bbb7..73725c580c02 100644 --- a/package/source/zipapi/ThreadedDeflater.cxx +++ b/package/source/zipapi/ThreadedDeflater.cxx @@ -44,14 +44,19 @@ class ThreadedDeflater::Task : public comphelper::ThreadTask ThreadedDeflater* deflater; int sequence; int blockSize; + bool firstTask : 1; + bool lastTask : 1; public: - Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_) + Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_, bool firstTask_, + bool lastTask_) : comphelper::ThreadTask(deflater_->threadTaskTag) , stream() , deflater(deflater_) , sequence(sequence_) , blockSize(blockSize_) + , firstTask(firstTask_) + , lastTask(lastTask_) { } @@ -61,58 +66,83 @@ private: ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel) : threadTaskTag(comphelper::ThreadPool::createThreadTaskTag()) + , totalIn(0) + , totalOut(0) , zlibLevel(nSetLevel) - , pendingTasksCount(0) { } -ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE -{ - waitForTasks(); - clear(); -} +ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE { clear(); } -void ThreadedDeflater::startDeflate(const uno::Sequence& rBuffer) +void ThreadedDeflater::deflateWrite( + const css::uno::Reference& xInStream, + std::function&, sal_Int32)> aProcessInputFunc, + std::function&, sal_Int32)> aProcessOutputFunc) { - inBuffer = rBuffer; - sal_Int64 size = inBuffer.getLength(); - int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize; - tasksCount = std::max(tasksCount, 1); - pendingTasksCount = tasksCount; - outBuffers.resize(pendingTasksCount); - for (int sequence = 0; sequence < tasksCount; ++sequence) + sal_Int64 nThreadCount = comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount(); + sal_Int64 batchSize = MaxBlockSize * nThreadCount; + inBuffer.realloc(batchSize); + prevDataBlock.realloc(MaxBlockSize); + outBuffers.resize(nThreadCount); + maProcessOutputFunc = aProcessOutputFunc; + bool firstTask = true; + + while (xInStream->available() > 0) { - sal_Int64 thisSize = std::min(MaxBlockSize, size); - size -= thisSize; - comphelper::ThreadPool::getSharedOptimalPool().pushTask( - std::make_unique(this, sequence, thisSize)); + sal_Int64 inputBytes = xInStream->readBytes(inBuffer, batchSize); + aProcessInputFunc(inBuffer, inputBytes); + totalIn += inputBytes; + int sequence = 0; + bool lastBatch = xInStream->available() <= 0; + sal_Int64 bytesPending = inputBytes; + while (bytesPending > 0) + { + sal_Int64 taskSize = std::min(MaxBlockSize, bytesPending); + bytesPending -= taskSize; + bool lastTask = lastBatch && !bytesPending; + comphelper::ThreadPool::getSharedOptimalPool().pushTask( + std::make_unique(this, sequence++, taskSize, firstTask, lastTask)); + + if (firstTask) + firstTask = false; + } + + assert(bytesPending == 0); + + comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag); + + if (!lastBatch) + { + assert(inputBytes == batchSize); + std::copy_n(inBuffer.begin() + (batchSize - MaxBlockSize), MaxBlockSize, + prevDataBlock.begin()); + } + + processDeflatedBuffers(); } - assert(size == 0); } -bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; } - -css::uno::Sequence ThreadedDeflater::getOutput() const +void ThreadedDeflater::processDeflatedBuffers() { - assert(finished()); - sal_Int64 totalSize = 0; + sal_Int64 batchOutputSize = 0; for (const auto& buffer : outBuffers) - totalSize += buffer.size(); - uno::Sequence outBuffer(totalSize); + batchOutputSize += buffer.size(); + + css::uno::Sequence outBuffer(batchOutputSize); + auto pos = outBuffer.begin(); - for (const auto& buffer : outBuffers) + for (auto& buffer : outBuffers) + { pos = std::copy(buffer.begin(), buffer.end(), pos); - return outBuffer; -} + buffer.clear(); + } -void ThreadedDeflater::waitForTasks() -{ - comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag); + maProcessOutputFunc(outBuffer, batchOutputSize); + totalOut += batchOutputSize; } void ThreadedDeflater::clear() { - assert(finished()); inBuffer = uno::Sequence(); outBuffers.clear(); } @@ -147,27 +177,35 @@ void ThreadedDeflater::Task::doWork() // zlib doesn't handle const properly unsigned char* inBufferPtr = reinterpret_cast( const_cast(deflater->inBuffer.getConstArray())); - if (sequence != 0) + if (!firstTask) { // the window size is 32k, so set last 32k of previous data as the dictionary assert(MAX_WBITS == 15); assert(MaxBlockSize >= 32768); - deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768); + if (sequence > 0) + { + deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768); + } + else + { + unsigned char* prevBufferPtr = reinterpret_cast( + const_cast(deflater->prevDataBlock.getConstArray())); + deflateSetDictionary(&stream, prevBufferPtr + MaxBlockSize - 32768, 32768); + } } stream.next_in = inBufferPtr + myInBufferStart; stream.avail_in = blockSize; stream.next_out = reinterpret_cast(deflater->outBuffers[sequence].data()); stream.avail_out = outputMaxSize; - bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block? + // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary, // and since we use a raw stream, the data blocks then can be simply concatenated. - int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH); + int res = deflate(&stream, lastTask ? Z_FINISH : Z_SYNC_FLUSH); assert(stream.avail_in == 0); // Check that everything has been deflated. - if (last ? res == Z_STREAM_END : res == Z_OK) + if (lastTask ? res == Z_STREAM_END : res == Z_OK) { // ok sal_Int64 outSize = outputMaxSize - stream.avail_out; deflater->outBuffers[sequence].resize(outSize); - --deflater->pendingTasksCount; } else { diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx index bee9d0aeb70c..f08e687c43a4 100644 --- a/package/source/zipapi/ZipOutputEntry.cxx +++ b/package/source/zipapi/ZipOutputEntry.cxx @@ -363,28 +363,18 @@ ZipOutputEntryParallel::ZipOutputEntryParallel( void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream) { - sal_Int64 toRead = xInStream->available(); - uno::Sequence< sal_Int8 > inBuffer( toRead ); - sal_Int64 read = xInStream->readBytes(inBuffer, toRead); - if (read < toRead) - inBuffer.realloc( read ); - while( xInStream->available() > 0 ) - { // We didn't get the full size from available(). - uno::Sequence< sal_Int8 > buf( xInStream->available()); - read = xInStream->readBytes( buf, xInStream->available()); - sal_Int64 oldSize = inBuffer.getLength(); - inBuffer.realloc( oldSize + read ); - std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize ); - } ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION ); - totalIn = inBuffer.getLength(); - deflater.startDeflate( inBuffer ); - processInput( inBuffer ); - deflater.waitForTasks(); - uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput(); - deflater.clear(); // release memory - totalOut = outBuffer.getLength(); - processDeflated(outBuffer, outBuffer.getLength()); + deflater.deflateWrite(xInStream, + [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) { + if (!m_bEncryptCurrentEntry) + m_aCRC.updateSegment(rBuffer, nLen); + }, + [this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) { + processDeflated(rBuffer, nLen); + } + ); + totalIn = deflater.getTotalIn(); + totalOut = deflater.getTotalOut(); closeEntry(); } -- cgit v1.2.3