summaryrefslogtreecommitdiff
path: root/package
diff options
context:
space:
mode:
authorLuboš Luňák <l.lunak@collabora.com>2019-05-24 22:05:30 +0200
committerLuboš Luňák <l.lunak@collabora.com>2019-05-28 12:27:48 +0200
commit7cd3f267cfbf3655f6a7a395b80560ecd22e15f7 (patch)
treee14173cddc4e4fb5b0709a7ae64d90316da0b76a /package
parentee22409ab6187d3545db71d255ec3866262baa6e (diff)
split out thread functionality from ZipOutputEntry
It can be easily separated out, it looked like hacked in. And I will need to do more refactoring of the class, so this shouldn't be more complex than necessary. Change-Id: I302da55409e9195274907ca4939c37fbb2427b18 Reviewed-on: https://gerrit.libreoffice.org/73031 Tested-by: Jenkins Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
Diffstat (limited to 'package')
-rw-r--r--package/inc/ZipOutputEntry.hxx51
-rw-r--r--package/inc/ZipOutputStream.hxx7
-rw-r--r--package/source/zipapi/Deflater.cxx6
-rw-r--r--package/source/zipapi/ZipOutputEntry.cxx172
-rw-r--r--package/source/zipapi/ZipOutputStream.cxx14
-rw-r--r--package/source/zippackage/ZipPackageStream.cxx66
6 files changed, 165 insertions, 151 deletions
diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx
index c35da5818062..af6528f04ea1 100644
--- a/package/inc/ZipOutputEntry.hxx
+++ b/package/inc/ZipOutputEntry.hxx
@@ -27,6 +27,7 @@
#include <com/sun/star/xml/crypto/XDigestContext.hpp>
#include <package/Deflater.hxx>
+#include <comphelper/threadpool.hxx>
#include "CRC32.hxx"
#include <atomic>
@@ -36,25 +37,20 @@ class ZipPackageStream;
class ZipOutputEntry
{
- // allow only DeflateThreadTask to change m_bFinished using setFinished()
- friend class DeflateThreadTask;
-
+protected:
css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
ZipUtils::Deflater m_aDeflater;
css::uno::Reference< css::uno::XComponentContext > m_xContext;
- OUString m_aTempURL;
css::uno::Reference< css::io::XOutputStream > m_xOutStream;
css::uno::Reference< css::xml::crypto::XCipherContext > m_xCipherContext;
css::uno::Reference< css::xml::crypto::XDigestContext > m_xDigestContext;
- std::exception_ptr m_aParallelDeflateException;
CRC32 m_aCRC;
ZipEntry *m_pCurrentEntry;
sal_Int16 m_nDigested;
ZipPackageStream* m_pCurrentStream;
bool const m_bEncryptCurrentEntry;
- std::atomic<bool> m_bFinished;
public:
ZipOutputEntry(
@@ -62,32 +58,49 @@ public:
const css::uno::Reference< css::uno::XComponentContext >& rxContext,
ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
- ~ZipOutputEntry();
+ ZipEntry* getZipEntry() { return m_pCurrentEntry; }
+ ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
+ bool isEncrypt() { return m_bEncryptCurrentEntry; }
- /* This block of methods is for threaded zipping, where we compress to a temp stream, whose
- data is retrieved via getData */
+ void closeEntry();
+
+ void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream);
+ void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
+
+protected:
ZipOutputEntry(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutStream,
+ const css::uno::Reference< css::uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream);
+ void doDeflate();
+};
+
+// Class that runs the compression in a thread.
+class ZipOutputEntryInThread : public ZipOutputEntry
+{
+ class Task;
+ OUString m_aTempURL;
+ std::exception_ptr m_aParallelDeflateException;
+ std::atomic<bool> m_bFinished;
+
+public:
+ ZipOutputEntryInThread(
const css::uno::Reference< css::uno::XComponentContext >& rxContext,
ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt);
+ std::unique_ptr<comphelper::ThreadTask> createTask(
+ const std::shared_ptr<comphelper::ThreadTaskTag>& pTag,
+ const css::uno::Reference< css::io::XInputStream >& xInStream );
+ /* This block of methods is for threaded zipping, where we compress to a temp stream, whose
+ data is retrieved via getData */
void createBufferFile();
void setParallelDeflateException(const std::exception_ptr& exception) { m_aParallelDeflateException = exception; }
css::uno::Reference< css::io::XInputStream > getData() const;
const std::exception_ptr& getParallelDeflateException() const { return m_aParallelDeflateException; }
void closeBufferFile();
void deleteBufferFile();
-
- ZipEntry* getZipEntry() { return m_pCurrentEntry; }
- ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
- bool isEncrypt() { return m_bEncryptCurrentEntry; }
-
- void closeEntry();
- void write(const css::uno::Sequence< sal_Int8 >& rBuffer);
-
bool isFinished() const { return m_bFinished; }
-
private:
void setFinished() { m_bFinished = true; }
- void doDeflate();
};
#endif
diff --git a/package/inc/ZipOutputStream.hxx b/package/inc/ZipOutputStream.hxx
index ff7b66d64507..b527abde1443 100644
--- a/package/inc/ZipOutputStream.hxx
+++ b/package/inc/ZipOutputStream.hxx
@@ -29,6 +29,7 @@
struct ZipEntry;
class ZipOutputEntry;
+class ZipOutputEntryInThread;
class ZipPackageStream;
class ZipOutputStream
@@ -39,7 +40,7 @@ class ZipOutputStream
ByteChucker m_aChucker;
ZipEntry *m_pCurrentEntry;
- std::vector< ZipOutputEntry* > m_aEntries;
+ std::vector< ZipOutputEntryInThread* > m_aEntries;
std::exception_ptr m_aDeflateException;
public:
@@ -47,7 +48,7 @@ public:
const css::uno::Reference< css::io::XOutputStream > &xOStream );
~ZipOutputStream();
- void addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );
+ void addDeflatingThreadTask( ZipOutputEntryInThread *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );
/// @throws css::io::IOException
/// @throws css::uno::RuntimeException
@@ -79,7 +80,7 @@ private:
void writeEXT( const ZipEntry &rEntry );
// ScheduledThread handling helpers
- void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate);
+ void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread> pCandidate);
void consumeFinishedScheduledThreadTaskEntries();
public:
diff --git a/package/source/zipapi/Deflater.cxx b/package/source/zipapi/Deflater.cxx
index 8c02c4fa29a2..eacbbc9a67ac 100644
--- a/package/source/zipapi/Deflater.cxx
+++ b/package/source/zipapi/Deflater.cxx
@@ -100,7 +100,7 @@ void Deflater::setInputSegment( const uno::Sequence< sal_Int8 >& rBuffer )
nLength = rBuffer.getLength();
}
-bool Deflater::needsInput( )
+bool Deflater::needsInput() const
{
return nLength <=0;
}
@@ -113,11 +113,11 @@ sal_Int32 Deflater::doDeflateSegment( uno::Sequence< sal_Int8 >& rBuffer, sal_In
OSL_ASSERT( !(nNewLength < 0 || nNewLength > rBuffer.getLength()));
return doDeflateBytes(rBuffer, /*nNewOffset*/0, nNewLength);
}
-sal_Int64 Deflater::getTotalIn( )
+sal_Int64 Deflater::getTotalIn() const
{
return pStream->total_in; // FIXME64: zlib doesn't look 64bit clean here
}
-sal_Int64 Deflater::getTotalOut( )
+sal_Int64 Deflater::getTotalOut() const
{
return pStream->total_out; // FIXME64: zlib doesn't look 64bit clean here
}
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index 2b1447bd3246..74281fd063dd 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -46,7 +46,8 @@ ZipOutputEntry::ZipOutputEntry(
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
ZipPackageStream* pStream,
- bool bEncrypt)
+ bool bEncrypt,
+ bool checkStream)
: m_aDeflateBuffer(n_ConstBufferSize)
, m_aDeflater(DEFAULT_COMPRESSION, true)
, m_xContext(rxContext)
@@ -55,10 +56,10 @@ ZipOutputEntry::ZipOutputEntry(
, m_nDigested(0)
, m_pCurrentStream(pStream)
, m_bEncryptCurrentEntry(bEncrypt)
-, m_bFinished(false)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
- assert(m_xOutStream.is());
+ (void)checkStream;
+ assert(!checkStream || m_xOutStream.is());
if (m_bEncryptCurrentEntry)
{
m_xCipherContext = ZipFile::StaticGetCipher( m_xContext, pStream->GetEncryptionData(), true );
@@ -67,64 +68,13 @@ ZipOutputEntry::ZipOutputEntry(
}
ZipOutputEntry::ZipOutputEntry(
+ const css::uno::Reference< css::io::XOutputStream >& rxOutput,
const uno::Reference< uno::XComponentContext >& rxContext,
ZipEntry& rEntry,
ZipPackageStream* pStream,
bool bEncrypt)
-: m_aDeflateBuffer(n_ConstBufferSize)
-, m_aDeflater(DEFAULT_COMPRESSION, true)
-, m_xContext(rxContext)
-, m_pCurrentEntry(&rEntry)
-, m_nDigested(0)
-, m_pCurrentStream(pStream)
-, m_bEncryptCurrentEntry(bEncrypt)
-, m_bFinished(false)
+: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true)
{
- assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
- if (m_bEncryptCurrentEntry)
- {
- m_xCipherContext = ZipFile::StaticGetCipher( m_xContext, pStream->GetEncryptionData(), true );
- m_xDigestContext = ZipFile::StaticGetDigestContextForChecksum( m_xContext, pStream->GetEncryptionData() );
- }
-}
-
-ZipOutputEntry::~ZipOutputEntry()
-{
-}
-
-void ZipOutputEntry::createBufferFile()
-{
- assert(!m_xOutStream.is() && m_aTempURL.isEmpty() &&
- "should only be called in the threaded mode where there is no existing stream yet");
- uno::Reference < beans::XPropertySet > xTempFileProps(
- io::TempFile::create(m_xContext),
- uno::UNO_QUERY_THROW );
- xTempFileProps->setPropertyValue("RemoveFile", uno::makeAny(false));
- uno::Any aUrl = xTempFileProps->getPropertyValue( "Uri" );
- aUrl >>= m_aTempURL;
- assert(!m_aTempURL.isEmpty());
-
- uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
- m_xOutStream = xTempAccess->openFileWrite(m_aTempURL);
-}
-
-void ZipOutputEntry::closeBufferFile()
-{
- m_xOutStream->closeOutput();
- m_xOutStream.clear();
-}
-
-void ZipOutputEntry::deleteBufferFile()
-{
- assert(!m_xOutStream.is() && !m_aTempURL.isEmpty());
- uno::Reference < ucb::XSimpleFileAccess3 > xAccess(ucb::SimpleFileAccess::create(m_xContext));
- xAccess->kill(m_aTempURL);
-}
-
-uno::Reference< io::XInputStream > ZipOutputEntry::getData() const
-{
- uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
- return xTempAccess->openFileRead(m_aTempURL);
}
void ZipOutputEntry::closeEntry()
@@ -241,4 +191,114 @@ void ZipOutputEntry::doDeflate()
}
}
+ZipOutputEntryInThread::ZipOutputEntryInThread(
+ const uno::Reference< uno::XComponentContext >& rxContext,
+ ZipEntry& rEntry,
+ ZipPackageStream* pStream,
+ bool bEncrypt)
+: ZipOutputEntry( uno::Reference< css::io::XOutputStream >(), rxContext, rEntry, pStream, bEncrypt, false )
+, m_bFinished(false)
+{
+}
+
+void ZipOutputEntryInThread::createBufferFile()
+{
+ assert(!m_xOutStream.is() && m_aTempURL.isEmpty() &&
+ "should only be called in the threaded mode where there is no existing stream yet");
+ uno::Reference < beans::XPropertySet > xTempFileProps(
+ io::TempFile::create(m_xContext),
+ uno::UNO_QUERY_THROW );
+ xTempFileProps->setPropertyValue("RemoveFile", uno::makeAny(false));
+ uno::Any aUrl = xTempFileProps->getPropertyValue( "Uri" );
+ aUrl >>= m_aTempURL;
+ assert(!m_aTempURL.isEmpty());
+
+ uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
+ m_xOutStream = xTempAccess->openFileWrite(m_aTempURL);
+}
+
+void ZipOutputEntryInThread::closeBufferFile()
+{
+ m_xOutStream->closeOutput();
+ m_xOutStream.clear();
+}
+
+void ZipOutputEntryInThread::deleteBufferFile()
+{
+ assert(!m_xOutStream.is() && !m_aTempURL.isEmpty());
+ uno::Reference < ucb::XSimpleFileAccess3 > xAccess(ucb::SimpleFileAccess::create(m_xContext));
+ xAccess->kill(m_aTempURL);
+}
+
+uno::Reference< io::XInputStream > ZipOutputEntryInThread::getData() const
+{
+ uno::Reference < ucb::XSimpleFileAccess3 > xTempAccess(ucb::SimpleFileAccess::create(m_xContext));
+ return xTempAccess->openFileRead(m_aTempURL);
+}
+
+class ZipOutputEntryInThread::Task : public comphelper::ThreadTask
+{
+ ZipOutputEntryInThread *mpEntry;
+ uno::Reference< io::XInputStream > mxInStream;
+
+public:
+ Task( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntryInThread *pEntry,
+ const uno::Reference< io::XInputStream >& xInStream )
+ : comphelper::ThreadTask(pTag)
+ , mpEntry(pEntry)
+ , mxInStream(xInStream)
+ {}
+
+private:
+ virtual void doWork() override
+ {
+ try
+ {
+ mpEntry->createBufferFile();
+ mpEntry->writeStream(mxInStream);
+ mxInStream.clear();
+ mpEntry->closeBufferFile();
+ mpEntry->setFinished();
+ }
+ catch (...)
+ {
+ mpEntry->setParallelDeflateException(std::current_exception());
+ try
+ {
+ if (mpEntry->m_xOutStream.is())
+ mpEntry->closeBufferFile();
+ if (!mpEntry->m_aTempURL.isEmpty())
+ mpEntry->deleteBufferFile();
+ }
+ catch (uno::Exception const&)
+ {
+ }
+ mpEntry->setFinished();
+ }
+ }
+};
+
+std::unique_ptr<comphelper::ThreadTask> ZipOutputEntryInThread::createTask(
+ const std::shared_ptr<comphelper::ThreadTaskTag>& pTag,
+ const uno::Reference< io::XInputStream >& xInStream )
+{
+ return std::make_unique<Task>(pTag, this, xInStream);
+}
+
+void ZipOutputEntry::writeStream(const uno::Reference< io::XInputStream >& xInStream)
+{
+ sal_Int32 nLength = 0;
+ uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
+ do
+ {
+ nLength = xInStream->readBytes(aSeq, n_ConstBufferSize);
+ if (nLength != n_ConstBufferSize)
+ aSeq.realloc(nLength);
+
+ write(aSeq);
+ }
+ while (nLength == n_ConstBufferSize);
+ closeEntry();
+}
+
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index 5d90224981b0..8ea040bd55e3 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -68,7 +68,7 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry )
}
}
-void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask )
+void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntryInThread *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask )
{
comphelper::ThreadPool::getSharedOptimalPool().pushTask(std::move(pTask));
m_aEntries.push_back(pEntry);
@@ -91,7 +91,7 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = nullptr;
}
-void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate)
+void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread> pCandidate)
{
//Any exceptions thrown in the threads were caught and stored for now
const std::exception_ptr& rCaughtException(pCandidate->getParallelDeflateException());
@@ -126,13 +126,13 @@ void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputE
void ZipOutputStream::consumeFinishedScheduledThreadTaskEntries()
{
- std::vector< ZipOutputEntry* > aNonFinishedEntries;
+ std::vector< ZipOutputEntryInThread* > aNonFinishedEntries;
- for(ZipOutputEntry* pEntry : m_aEntries)
+ for(ZipOutputEntryInThread* pEntry : m_aEntries)
{
if(pEntry->isFinished())
{
- consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pEntry));
+ consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread>(pEntry));
}
else
{
@@ -167,9 +167,9 @@ void ZipOutputStream::finish()
// consume all processed entries
while(!m_aEntries.empty())
{
- ZipOutputEntry* pCandidate = m_aEntries.back();
+ ZipOutputEntryInThread* pCandidate = m_aEntries.back();
m_aEntries.pop_back();
- consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pCandidate));
+ consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntryInThread>(pCandidate));
}
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index 569368160f7f..e795776ab065 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -17,7 +17,6 @@
* the License at http://www.apache.org/licenses/LICENSE-2.0 .
*/
-#include <memory>
#include <ZipPackageStream.hxx>
#include <com/sun/star/beans/PropertyValue.hpp>
@@ -431,65 +430,6 @@ bool ZipPackageStream::ParsePackageRawStream()
return true;
}
-static void deflateZipEntry(ZipOutputEntry *pZipEntry,
- const uno::Reference< io::XInputStream >& xInStream)
-{
- sal_Int32 nLength = 0;
- uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
- do
- {
- nLength = xInStream->readBytes(aSeq, n_ConstBufferSize);
- if (nLength != n_ConstBufferSize)
- aSeq.realloc(nLength);
-
- pZipEntry->write(aSeq);
- }
- while (nLength == n_ConstBufferSize);
- pZipEntry->closeEntry();
-}
-
-class DeflateThreadTask: public comphelper::ThreadTask
-{
- ZipOutputEntry *mpEntry;
- uno::Reference< io::XInputStream > mxInStream;
-
-public:
- DeflateThreadTask( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntry *pEntry,
- const uno::Reference< io::XInputStream >& xInStream )
- : comphelper::ThreadTask(pTag)
- , mpEntry(pEntry)
- , mxInStream(xInStream)
- {}
-
-private:
- virtual void doWork() override
- {
- try
- {
- mpEntry->createBufferFile();
- deflateZipEntry(mpEntry, mxInStream);
- mxInStream.clear();
- mpEntry->closeBufferFile();
- mpEntry->setFinished();
- }
- catch (...)
- {
- mpEntry->setParallelDeflateException(std::current_exception());
- try
- {
- if (mpEntry->m_xOutStream.is())
- mpEntry->closeBufferFile();
- if (!mpEntry->m_aTempURL.isEmpty())
- mpEntry->deleteBufferFile();
- }
- catch (uno::Exception const&)
- {
- }
- mpEntry->setFinished();
- }
- }
-};
-
static void ImplSetStoredData( ZipEntry & rEntry, uno::Reference< io::XInputStream> const & rStream )
{
// It's very annoying that we have to do this, but lots of zip packages
@@ -839,16 +779,16 @@ bool ZipPackageStream::saveChild(
rZipOut.reduceScheduledThreadTasksToGivenNumberOrLess(nAllowedTasks);
// Start a new thread task deflating this zip entry
- ZipOutputEntry *pZipEntry = new ZipOutputEntry(
+ ZipOutputEntryInThread *pZipEntry = new ZipOutputEntryInThread(
m_xContext, *pTempEntry, this, bToBeEncrypted);
rZipOut.addDeflatingThreadTask( pZipEntry,
- std::make_unique<DeflateThreadTask>(rZipOut.getThreadTaskTag(), pZipEntry, xStream) );
+ pZipEntry->createTask( rZipOut.getThreadTaskTag(), xStream) );
}
else
{
rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
ZipOutputEntry aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted);
- deflateZipEntry(&aZipEntry, xStream);
+ aZipEntry.writeStream(xStream);
rZipOut.rawCloseEntry(bToBeEncrypted);
}
}