summaryrefslogtreecommitdiff
path: root/package
diff options
context:
space:
mode:
authorMatúš Kukan <matus.kukan@collabora.com>2014-10-21 15:17:13 +0200
committerMatúš Kukan <matus.kukan@collabora.com>2014-11-17 10:49:23 +0100
commitfbf714b45625c50bb1c736ef231b5dbbab0016a1 (patch)
tree0e1a9e9002a8ce8ca46d7a7071f40c08ffea77e4 /package
parentdb5552631b13e5a1d330929cd5093bd0f9894ec8 (diff)
package: Finally implement parallel zip entries deflating
For that: 1, create ZipPackageStream::successfullyWritten to be called after the content is written 2, Do not take mutex when reading from WrapStreamForShare - threads should be using different streams anyway, but there is only one common mutex. :-/ Change-Id: I90303e49206b19454dd4141e24cc8be29c433045
Diffstat (limited to 'package')
-rw-r--r--package/inc/ZipOutputEntry.hxx3
-rw-r--r--package/inc/ZipOutputStream.hxx7
-rw-r--r--package/inc/ZipPackageStream.hxx6
-rw-r--r--package/source/zipapi/ZipOutputEntry.cxx3
-rw-r--r--package/source/zipapi/ZipOutputStream.cxx37
-rw-r--r--package/source/zippackage/ZipPackageStream.cxx133
-rw-r--r--package/source/zippackage/wrapstreamforshare.cxx4
7 files changed, 121 insertions, 72 deletions
diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx
index c24d5a905bfe..9e396ce4dc7b 100644
--- a/package/inc/ZipOutputEntry.hxx
+++ b/package/inc/ZipOutputEntry.hxx
@@ -54,6 +54,9 @@ public:
~ZipOutputEntry();
css::uno::Sequence< sal_Int8 > getData();
+ ZipEntry* getZipEntry() { return m_pCurrentEntry; }
+ ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
+ bool isEncrypt() { return m_bEncryptCurrentEntry; }
void closeEntry();
void write(const css::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength);
diff --git a/package/inc/ZipOutputStream.hxx b/package/inc/ZipOutputStream.hxx
index f11b8833d146..4e8e4ff150be 100644
--- a/package/inc/ZipOutputStream.hxx
+++ b/package/inc/ZipOutputStream.hxx
@@ -23,10 +23,12 @@
#include <com/sun/star/io/XOutputStream.hpp>
#include <ByteChucker.hxx>
+#include <comphelper/threadpool.hxx>
#include <vector>
struct ZipEntry;
+class ZipOutputEntry;
class ZipPackageStream;
class ZipOutputStream
@@ -35,14 +37,17 @@ class ZipOutputStream
::std::vector < ZipEntry * > m_aZipList;
ByteChucker m_aChucker;
- bool m_bFinished;
ZipEntry *m_pCurrentEntry;
+ comphelper::ThreadPool &m_rSharedThreadPool;
+ std::vector< ZipOutputEntry* > m_aEntries;
public:
ZipOutputStream(
const ::com::sun::star::uno::Reference< ::com::sun::star::io::XOutputStream > &xOStream );
~ZipOutputStream();
+ void addDeflatingThread( ZipOutputEntry *pEntry, comphelper::ThreadTask *pThreadTask );
+
void writeLOC( ZipEntry *pEntry, bool bEncrypt = false )
throw(::com::sun::star::io::IOException, ::com::sun::star::uno::RuntimeException);
void rawWrite( ::com::sun::star::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength )
diff --git a/package/inc/ZipPackageStream.hxx b/package/inc/ZipPackageStream.hxx
index 356d42b78f11..ff6d3db1f6f9 100644
--- a/package/inc/ZipPackageStream.hxx
+++ b/package/inc/ZipPackageStream.hxx
@@ -63,14 +63,13 @@ private:
sal_uInt8 m_nStreamMode;
sal_uInt32 m_nMagicalHackPos;
sal_uInt32 m_nMagicalHackSize;
+ sal_Int64 m_nOwnStreamOrigSize;
bool m_bHasSeekable;
-
bool m_bCompressedIsSetFromOutside;
-
bool m_bFromManifest;
-
bool m_bUseWinEncoding;
+ bool m_bRawStream;
::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > GetOwnSeekStream();
@@ -138,6 +137,7 @@ public:
void setZipEntryOnLoading( const ZipEntry &rInEntry);
::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > SAL_CALL getRawData()
throw(::com::sun::star::uno::RuntimeException);
+ void successfullyWritten( ZipEntry *pEntry );
static ::com::sun::star::uno::Sequence < sal_Int8 > static_getImplementationId();
diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx
index f43b5c7e843e..a5fbe25eef61 100644
--- a/package/source/zipapi/ZipOutputEntry.cxx
+++ b/package/source/zipapi/ZipOutputEntry.cxx
@@ -47,14 +47,13 @@ ZipOutputEntry::ZipOutputEntry( const uno::Reference< uno::XComponentContext >&
, m_pCurrentEntry(&rEntry)
, m_nDigested(0)
, m_bEncryptCurrentEntry(bEncrypt)
-, m_pCurrentStream(NULL)
+, m_pCurrentStream(pStream)
{
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
if (m_bEncryptCurrentEntry)
{
m_xCipherContext = ZipFile::StaticGetCipher( rxContext, pStream->GetEncryptionData(), true );
m_xDigestContext = ZipFile::StaticGetDigestContextForChecksum( rxContext, pStream->GetEncryptionData() );
- m_pCurrentStream = pStream;
}
}
diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx
index c9b6e08cfad7..fcfe35f82070 100644
--- a/package/source/zipapi/ZipOutputStream.cxx
+++ b/package/source/zipapi/ZipOutputStream.cxx
@@ -27,6 +27,7 @@
#include <PackageConstants.hxx>
#include <ZipEntry.hxx>
+#include <ZipOutputEntry.hxx>
#include <ZipPackageStream.hxx>
using namespace com::sun::star;
@@ -39,15 +40,13 @@ using namespace com::sun::star::packages::zip::ZipConstants;
ZipOutputStream::ZipOutputStream( const uno::Reference < io::XOutputStream > &xOStream )
: m_xStream(xOStream)
, m_aChucker(xOStream)
-, m_bFinished(false)
, m_pCurrentEntry(NULL)
+, m_rSharedThreadPool(comphelper::ThreadPool::getSharedOptimalPool())
{
}
ZipOutputStream::~ZipOutputStream( void )
{
- for (sal_Int32 i = 0, nEnd = m_aZipList.size(); i < nEnd; i++)
- delete m_aZipList[i];
}
void ZipOutputStream::setEntry( ZipEntry *pEntry )
@@ -66,6 +65,12 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry )
}
}
+void ZipOutputStream::addDeflatingThread( ZipOutputEntry *pEntry, comphelper::ThreadTask *pThread )
+{
+ m_rSharedThreadPool.pushTask(pThread);
+ m_aEntries.push_back(pEntry);
+}
+
void ZipOutputStream::rawWrite( Sequence< sal_Int8 >& rBuffer, sal_Int32 /*nNewOffset*/, sal_Int32 nNewLength )
throw(IOException, RuntimeException)
{
@@ -85,21 +90,33 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = NULL;
}
-void ZipOutputStream::finish( )
+void ZipOutputStream::finish()
throw(IOException, RuntimeException)
{
- if (m_bFinished)
- return;
+ assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
+
+ // Wait for all threads to finish & write
+ m_rSharedThreadPool.waitUntilEmpty();
+ for (size_t i = 0; i < m_aEntries.size(); i++)
+ {
+ writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt());
+ uno::Sequence< sal_Int8 > aCompressedData = m_aEntries[i]->getData();
+ rawWrite(aCompressedData, 0, aCompressedData.getLength());
+ rawCloseEntry(m_aEntries[i]->isEncrypt());
- if (m_aZipList.size() < 1)
- OSL_FAIL("Zip file must have at least one entry!\n");
+ m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry());
+ delete m_aEntries[i];
+ }
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
- for (sal_Int32 i =0, nEnd = m_aZipList.size(); i < nEnd; i++)
+ for (size_t i = 0; i < m_aZipList.size(); i++)
+ {
writeCEN( *m_aZipList[i] );
+ delete m_aZipList[i];
+ }
writeEND( nOffset, static_cast < sal_Int32 > (m_aChucker.GetPosition()) - nOffset);
- m_bFinished = true;
m_xStream->flush();
+ m_aZipList.clear();
}
void ZipOutputStream::writeEND(sal_uInt32 nOffset, sal_uInt32 nLength)
diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx
index ca2ad01d0bb1..9f29a6800ecc 100644
--- a/package/source/zippackage/ZipPackageStream.cxx
+++ b/package/source/zippackage/ZipPackageStream.cxx
@@ -90,10 +90,12 @@ ZipPackageStream::ZipPackageStream ( ZipPackage & rNewPackage,
, m_nStreamMode( PACKAGE_STREAM_NOTSET )
, m_nMagicalHackPos( 0 )
, m_nMagicalHackSize( 0 )
+, m_nOwnStreamOrigSize( 0 )
, m_bHasSeekable( false )
, m_bCompressedIsSetFromOutside( false )
, m_bFromManifest( false )
, m_bUseWinEncoding( false )
+, m_bRawStream( false )
{
m_xContext = xContext;
m_nFormat = nFormat;
@@ -437,6 +439,35 @@ bool ZipPackageStream::ParsePackageRawStream()
return true;
}
+class DeflateThread: public comphelper::ThreadTask
+{
+ ZipOutputEntry *mpEntry;
+ uno::Reference< io::XInputStream > mxInStream;
+
+public:
+ DeflateThread( ZipOutputEntry *pEntry,
+ const uno::Reference< io::XInputStream >& xInStream )
+ : mpEntry(pEntry)
+ , mxInStream(xInStream)
+ {}
+
+private:
+ virtual void doWork() SAL_OVERRIDE
+ {
+ sal_Int32 nLength = 0;
+ uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
+ do
+ {
+ nLength = mxInStream->readBytes(aSeq, n_ConstBufferSize);
+ mpEntry->write(aSeq, 0, nLength);
+ }
+ while (nLength == n_ConstBufferSize);
+ mpEntry->closeEntry();
+
+ mxInStream.clear();
+ }
+};
+
static void ImplSetStoredData( ZipEntry & rEntry, uno::Reference< io::XInputStream> & rStream )
{
// It's very annoying that we have to do this, but lots of zip packages
@@ -497,20 +528,21 @@ bool ZipPackageStream::saveChild(
OSL_ENSURE( m_nStreamMode != PACKAGE_STREAM_NOTSET, "Unacceptable ZipPackageStream mode!" );
- bool bRawStream = false;
+ m_bRawStream = false;
if ( m_nStreamMode == PACKAGE_STREAM_DETECT )
- bRawStream = ParsePackageRawStream();
+ m_bRawStream = ParsePackageRawStream();
else if ( m_nStreamMode == PACKAGE_STREAM_RAW )
- bRawStream = true;
+ m_bRawStream = true;
+ bool bParallelDeflate = false;
bool bTransportOwnEncrStreamAsRaw = false;
// During the storing the original size of the stream can be changed
// TODO/LATER: get rid of this hack
- sal_Int64 nOwnStreamOrigSize = bRawStream ? m_nMagicalHackSize : aEntry.nSize;
+ m_nOwnStreamOrigSize = m_bRawStream ? m_nMagicalHackSize : aEntry.nSize;
bool bUseNonSeekableAccess = false;
uno::Reference < io::XInputStream > xStream;
- if ( !IsPackageMember() && !bRawStream && !bToBeEncrypted && bToBeCompressed )
+ if ( !IsPackageMember() && !m_bRawStream && !bToBeEncrypted && bToBeCompressed )
{
// the stream is not a package member, not a raw stream,
// it should not be encrypted and it should be compressed,
@@ -540,11 +572,11 @@ bool ZipPackageStream::saveChild(
{
// If the stream is a raw one, then we should be positioned
// at the beginning of the actual data
- if ( !bToBeCompressed || bRawStream )
+ if ( !bToBeCompressed || m_bRawStream )
{
// The raw stream can neither be encrypted nor connected
- OSL_ENSURE( !bRawStream || !(bToBeCompressed || bToBeEncrypted), "The stream is already encrypted!\n" );
- xSeek->seek ( bRawStream ? m_nMagicalHackPos : 0 );
+ OSL_ENSURE( !m_bRawStream || !(bToBeCompressed || bToBeEncrypted), "The stream is already encrypted!\n" );
+ xSeek->seek ( m_bRawStream ? m_nMagicalHackPos : 0 );
ImplSetStoredData ( *pTempEntry, xStream );
// TODO/LATER: Get rid of hacks related to switching of Flag Method and Size properties!
@@ -553,7 +585,7 @@ bool ZipPackageStream::saveChild(
{
// this is the correct original size
pTempEntry->nSize = xSeek->getLength();
- nOwnStreamOrigSize = pTempEntry->nSize;
+ m_nOwnStreamOrigSize = pTempEntry->nSize;
}
xSeek->seek ( 0 );
@@ -592,7 +624,7 @@ bool ZipPackageStream::saveChild(
return bSuccess;
}
- if ( bToBeEncrypted || bRawStream || bTransportOwnEncrStreamAsRaw )
+ if ( bToBeEncrypted || m_bRawStream || bTransportOwnEncrStreamAsRaw )
{
if ( bToBeEncrypted && !bTransportOwnEncrStreamAsRaw )
{
@@ -624,11 +656,11 @@ bool ZipPackageStream::saveChild(
aPropSet[PKG_MNFST_ITERATION].Value <<= m_xBaseEncryptionData->m_nIterationCount;
// Need to store the uncompressed size in the manifest
- OSL_ENSURE( nOwnStreamOrigSize >= 0, "The stream size was not correctly initialized!\n" );
+ OSL_ENSURE( m_nOwnStreamOrigSize >= 0, "The stream size was not correctly initialized!\n" );
aPropSet[PKG_MNFST_UCOMPSIZE].Name = sSizeProperty;
- aPropSet[PKG_MNFST_UCOMPSIZE].Value <<= nOwnStreamOrigSize;
+ aPropSet[PKG_MNFST_UCOMPSIZE].Value <<= m_nOwnStreamOrigSize;
- if ( bRawStream || bTransportOwnEncrStreamAsRaw )
+ if ( m_bRawStream || bTransportOwnEncrStreamAsRaw )
{
::rtl::Reference< EncryptionData > xEncData = GetEncryptionData();
if ( !xEncData.is() )
@@ -651,7 +683,7 @@ bool ZipPackageStream::saveChild(
// If the entry is already stored in the zip file in the format we
// want for this write...copy it raw
if ( !bUseNonSeekableAccess
- && ( bRawStream || bTransportOwnEncrStreamAsRaw
+ && ( m_bRawStream || bTransportOwnEncrStreamAsRaw
|| ( IsPackageMember() && !bToBeEncrypted
&& ( ( aEntry.nMethod == DEFLATED && bToBeCompressed )
|| ( aEntry.nMethod == STORED && !bToBeCompressed ) ) ) ) )
@@ -671,7 +703,7 @@ bool ZipPackageStream::saveChild(
try
{
- if ( bRawStream )
+ if ( m_bRawStream )
xStream->skipBytes( m_nMagicalHackPos );
ZipOutputStream::setEntry(pTempEntry);
@@ -733,35 +765,29 @@ bool ZipPackageStream::saveChild(
try
{
ZipOutputStream::setEntry(pTempEntry);
- rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
// the entry is provided to the ZipOutputStream that will delete it
pAutoTempEntry.release();
- sal_Int32 nLength;
- uno::Sequence < sal_Int8 > aSeq (n_ConstBufferSize);
if (pTempEntry->nMethod == STORED)
{
+ sal_Int32 nLength;
+ uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
+ rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
do
{
nLength = xStream->readBytes(aSeq, n_ConstBufferSize);
rZipOut.rawWrite(aSeq, 0, nLength);
}
while ( nLength == n_ConstBufferSize );
+ rZipOut.rawCloseEntry(bToBeEncrypted);
}
else
{
- ZipOutputEntry aZipEntry(m_xContext, *pTempEntry, this, bToBeEncrypted);
- do
- {
- nLength = xStream->readBytes(aSeq, n_ConstBufferSize);
- aZipEntry.write(aSeq, 0, nLength);
- }
- while ( nLength == n_ConstBufferSize );
- aZipEntry.closeEntry();
- uno::Sequence< sal_Int8 > aCompressedData = aZipEntry.getData();
- rZipOut.rawWrite(aCompressedData, 0, aCompressedData.getLength());
+ bParallelDeflate = true;
+ // Start a new thread deflating this zip entry
+ ZipOutputEntry *pZipEntry = new ZipOutputEntry(m_xContext, *pTempEntry, this, bToBeEncrypted);
+ rZipOut.addDeflatingThread( pZipEntry, new DeflateThread(pZipEntry, xStream) );
}
- rZipOut.rawCloseEntry(bToBeEncrypted);
}
catch ( ZipException& )
{
@@ -793,36 +819,39 @@ bool ZipPackageStream::saveChild(
}
}
- if( bSuccess )
- {
- if ( !IsPackageMember() )
- {
- CloseOwnStreamIfAny();
- SetPackageMember ( true );
- }
+ if (bSuccess && !bParallelDeflate)
+ successfullyWritten(pTempEntry);
- if ( bRawStream )
- {
- // the raw stream was integrated and now behaves
- // as usual encrypted stream
- SetToBeEncrypted( true );
- }
+ if ( aPropSet.getLength()
+ && ( m_nFormat == embed::StorageFormats::PACKAGE || m_nFormat == embed::StorageFormats::OFOPXML ) )
+ rManList.push_back( aPropSet );
- // Then copy it back afterwards...
- ZipPackageFolder::copyZipEntry ( aEntry, *pTempEntry );
+ return bSuccess;
+}
- // TODO/LATER: get rid of this hack ( the encrypted stream size property is changed during saving )
- if ( IsEncrypted() )
- setSize( nOwnStreamOrigSize );
+void ZipPackageStream::successfullyWritten( ZipEntry *pEntry )
+{
+ if ( !IsPackageMember() )
+ {
+ CloseOwnStreamIfAny();
+ SetPackageMember ( true );
+ }
- aEntry.nOffset *= -1;
+ if ( m_bRawStream )
+ {
+ // the raw stream was integrated and now behaves
+ // as usual encrypted stream
+ SetToBeEncrypted( true );
}
- if ( aPropSet.getLength()
- && ( m_nFormat == embed::StorageFormats::PACKAGE || m_nFormat == embed::StorageFormats::OFOPXML ) )
- rManList.push_back( aPropSet );
+ // Then copy it back afterwards...
+ ZipPackageFolder::copyZipEntry( aEntry, *pEntry );
- return bSuccess;
+ // TODO/LATER: get rid of this hack ( the encrypted stream size property is changed during saving )
+ if ( IsEncrypted() )
+ setSize( m_nOwnStreamOrigSize );
+
+ aEntry.nOffset *= -1;
}
void ZipPackageStream::SetPackageMember( bool bNewValue )
diff --git a/package/source/zippackage/wrapstreamforshare.cxx b/package/source/zippackage/wrapstreamforshare.cxx
index 4f737bfa2964..c74e4b2de18b 100644
--- a/package/source/zippackage/wrapstreamforshare.cxx
+++ b/package/source/zippackage/wrapstreamforshare.cxx
@@ -54,8 +54,6 @@ sal_Int32 SAL_CALL WrapStreamForShare::readBytes( uno::Sequence< sal_Int8 >& aDa
io::IOException,
uno::RuntimeException, std::exception )
{
- ::osl::MutexGuard aGuard( m_rMutexRef->GetMutex() );
-
if ( !m_xInStream.is() )
throw io::IOException(THROW_WHERE );
@@ -73,8 +71,6 @@ sal_Int32 SAL_CALL WrapStreamForShare::readSomeBytes( uno::Sequence< sal_Int8 >&
io::IOException,
uno::RuntimeException, std::exception )
{
- ::osl::MutexGuard aGuard( m_rMutexRef->GetMutex() );
-
if ( !m_xInStream.is() )
throw io::IOException(THROW_WHERE );