summaryrefslogtreecommitdiff
path: root/io/source/stm/opump.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'io/source/stm/opump.cxx')
-rw-r--r--io/source/stm/opump.cxx504
1 files changed, 504 insertions, 0 deletions
diff --git a/io/source/stm/opump.cxx b/io/source/stm/opump.cxx
new file mode 100644
index 000000000000..384c080ca52a
--- /dev/null
+++ b/io/source/stm/opump.cxx
@@ -0,0 +1,504 @@
+/*************************************************************************
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * Copyright 2000, 2010 Oracle and/or its affiliates.
+ *
+ * OpenOffice.org - a multi-platform office productivity suite
+ *
+ * This file is part of OpenOffice.org.
+ *
+ * OpenOffice.org is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License version 3
+ * only, as published by the Free Software Foundation.
+ *
+ * OpenOffice.org is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License version 3 for more details
+ * (a copy is included in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * version 3 along with OpenOffice.org. If not, see
+ * <http://www.openoffice.org/license.html>
+ * for a copy of the LGPLv3 License.
+ *
+ ************************************************************************/
+
+// MARKER(update_precomp.py): autogen include statement, do not remove
+#include "precompiled_io.hxx"
+
+#include <stdio.h>
+
+#include <osl/diagnose.h>
+
+#include <com/sun/star/io/XActiveDataSource.hpp>
+#include <com/sun/star/io/XActiveDataSink.hpp>
+#include <com/sun/star/io/XActiveDataControl.hpp>
+#include <com/sun/star/io/XConnectable.hpp>
+#include <com/sun/star/lang/XSingleServiceFactory.hpp>
+#include <com/sun/star/lang/XMultiServiceFactory.hpp>
+#include <com/sun/star/lang/XServiceInfo.hpp>
+#include <com/sun/star/registry/XRegistryKey.hpp>
+
+#include <uno/dispatcher.h>
+#include <uno/mapping.hxx>
+#include <cppuhelper/implbase5.hxx>
+#include <cppuhelper/factory.hxx>
+#include <cppuhelper/interfacecontainer.hxx>
+#include <osl/mutex.hxx>
+#include <osl/thread.h>
+
+
+using namespace osl;
+using namespace std;
+using namespace rtl;
+using namespace cppu;
+using namespace com::sun::star::uno;
+using namespace com::sun::star::lang;
+using namespace com::sun::star::registry;
+using namespace com::sun::star::io;
+
+#include "factreg.hxx"
+
+namespace io_stm {
+
+ class Pump : public WeakImplHelper5<
+ XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo >
+ {
+ Mutex m_aMutex;
+ oslThread m_aThread;
+
+ Reference< XConnectable > m_xPred;
+ Reference< XConnectable > m_xSucc;
+ Reference< XInputStream > m_xInput;
+ Reference< XOutputStream > m_xOutput;
+ OInterfaceContainerHelper m_cnt;
+ sal_Bool m_closeFired;
+
+ void run();
+ static void static_run( void* pObject );
+
+ void close();
+ void fireClose();
+ void fireStarted();
+ void fireTerminated();
+ void fireError( const Any &a );
+
+ public:
+ Pump();
+ virtual ~Pump();
+
+ // XActiveDataSource
+ virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw();
+ virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw();
+
+ // XActiveDataSink
+ virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw();
+ virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw();
+
+ // XActiveDataControl
+ virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
+ virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
+ virtual void SAL_CALL start() throw( RuntimeException );
+ virtual void SAL_CALL terminate() throw();
+
+ // XConnectable
+ virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw();
+ virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw();
+ virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw();
+ virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw();
+
+ public: // XServiceInfo
+ virtual OUString SAL_CALL getImplementationName() throw( );
+ virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( );
+ virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( );
+ };
+
+Pump::Pump() : m_aThread( 0 ),
+ m_cnt( m_aMutex ),
+ m_closeFired( sal_False )
+{
+ g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
+}
+
+Pump::~Pump()
+{
+ // exit gracefully
+ if( m_aThread )
+ {
+ osl_joinWithThread( m_aThread );
+ osl_destroyThread( m_aThread );
+ }
+ g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
+}
+
+void Pump::fireError( const Any & exception )
+{
+ OInterfaceIteratorHelper iter( m_cnt );
+ while( iter.hasMoreElements() )
+ {
+ try
+ {
+ static_cast< XStreamListener * > ( iter.next() )->error( exception );
+ }
+ catch ( RuntimeException &e )
+ {
+ OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
+ OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
+ }
+ }
+}
+
+void Pump::fireClose()
+{
+ sal_Bool bFire = sal_False;
+ {
+ MutexGuard guard( m_aMutex );
+ if( ! m_closeFired )
+ {
+ m_closeFired = sal_True;
+ bFire = sal_True;
+ }
+ }
+
+ if( bFire )
+ {
+ OInterfaceIteratorHelper iter( m_cnt );
+ while( iter.hasMoreElements() )
+ {
+ try
+ {
+ static_cast< XStreamListener * > ( iter.next() )->closed( );
+ }
+ catch ( RuntimeException &e )
+ {
+ OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
+ OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
+ }
+ }
+ }
+}
+
+void Pump::fireStarted()
+{
+ OInterfaceIteratorHelper iter( m_cnt );
+ while( iter.hasMoreElements() )
+ {
+ try
+ {
+ static_cast< XStreamListener * > ( iter.next() )->started( );
+ }
+ catch ( RuntimeException &e )
+ {
+ OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
+ OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
+ }
+ }
+}
+
+void Pump::fireTerminated()
+{
+ OInterfaceIteratorHelper iter( m_cnt );
+ while( iter.hasMoreElements() )
+ {
+ try
+ {
+ static_cast< XStreamListener * > ( iter.next() )->terminated();
+ }
+ catch ( RuntimeException &e )
+ {
+ OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
+ OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
+ }
+ }
+}
+
+
+
+void Pump::close()
+{
+ // close streams and release references
+ Reference< XInputStream > rInput;
+ Reference< XOutputStream > rOutput;
+ {
+ MutexGuard guard( m_aMutex );
+ rInput = m_xInput;
+ m_xInput.clear();
+
+ rOutput = m_xOutput;
+ m_xOutput.clear();
+ m_xSucc.clear();
+ m_xPred.clear();
+ }
+ if( rInput.is() )
+ {
+ try
+ {
+ rInput->closeInput();
+ }
+ catch( Exception & )
+ {
+ // go down calm
+ }
+ }
+ if( rOutput.is() )
+ {
+ try
+ {
+ rOutput->closeOutput();
+ }
+ catch( Exception & )
+ {
+ // go down calm
+ }
+ }
+}
+
+void Pump::static_run( void* pObject )
+{
+ ((Pump*)pObject)->run();
+ ((Pump*)pObject)->release();
+}
+
+void Pump::run()
+{
+ try
+ {
+ fireStarted();
+ try
+ {
+ Reference< XInputStream > rInput;
+ Reference< XOutputStream > rOutput;
+ {
+ Guard< Mutex > aGuard( m_aMutex );
+ rInput = m_xInput;
+ rOutput = m_xOutput;
+ }
+
+ if( ! rInput.is() )
+ {
+ NotConnectedException exception(
+ OUString::createFromAscii( "no input stream set" ) , Reference<XInterface>((OWeakObject*)this) );
+ throw exception;
+ }
+ Sequence< sal_Int8 > aData;
+ while( rInput->readSomeBytes( aData, 65536 ) )
+ {
+ if( ! rOutput.is() )
+ {
+ NotConnectedException exception(
+ OUString::createFromAscii( "no output stream set" ) , Reference<XInterface>( (OWeakObject*)this) );
+ throw exception;
+ }
+ rOutput->writeBytes( aData );
+ osl_yieldThread();
+ }
+ }
+ catch ( IOException & e )
+ {
+ fireError( makeAny( e ) );
+ }
+ catch ( RuntimeException & e )
+ {
+ fireError( makeAny( e ) );
+ }
+ catch ( Exception & e )
+ {
+ fireError( makeAny( e ) );
+ }
+
+ close();
+ fireClose();
+ }
+ catch ( com::sun::star::uno::Exception &e )
+ {
+ // we are the last on the stack.
+ // this is to avoid crashing the program, when e.g. a bridge crashes
+ OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
+ OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() );
+ }
+}
+
+// ------------------------------------------------------------
+
+/*
+ * XConnectable
+ */
+
+void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ m_xPred = xPred;
+}
+
+// ------------------------------------------------------------
+
+Reference< XConnectable > Pump::getPredecessor() throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ return m_xPred;
+}
+
+// ------------------------------------------------------------
+
+void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ m_xSucc = xSucc;
+}
+
+// ------------------------------------------------------------
+
+Reference< XConnectable > Pump::getSuccessor() throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ return m_xSucc;
+}
+
+// -----------------------------------------------------------------
+
+/*
+ * XActiveDataControl
+ */
+
+void Pump::addListener( const Reference< XStreamListener >& xListener ) throw()
+{
+ m_cnt.addInterface( xListener );
+}
+
+// ------------------------------------------------------------
+
+void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw()
+{
+ m_cnt.removeInterface( xListener );
+}
+
+// ------------------------------------------------------------
+
+void Pump::start() throw( RuntimeException )
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this);
+ if( m_aThread )
+ {
+ // will be released by OPump::static_run
+ acquire();
+ osl_resumeThread( m_aThread );
+ }
+ else
+ {
+ throw RuntimeException(
+ OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )),
+ *this);
+ }
+}
+
+// ------------------------------------------------------------
+
+void Pump::terminate() throw()
+{
+ close();
+
+ // wait for the worker to die
+ if( m_aThread )
+ osl_joinWithThread( m_aThread );
+
+ fireTerminated();
+ fireClose();
+}
+
+// ------------------------------------------------------------
+
+/*
+ * XActiveDataSink
+ */
+
+void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ m_xInput = xStream;
+ Reference< XConnectable > xConnect( xStream, UNO_QUERY );
+ if( xConnect.is() )
+ xConnect->setSuccessor( this );
+ // data transfer starts in XActiveDataControl::start
+}
+
+// ------------------------------------------------------------
+
+Reference< XInputStream > Pump::getInputStream() throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ return m_xInput;
+}
+
+// ------------------------------------------------------------
+
+/*
+ * XActiveDataSource
+ */
+
+void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ m_xOutput = xOut;
+ Reference< XConnectable > xConnect( xOut, UNO_QUERY );
+ if( xConnect.is() )
+ xConnect->setPredecessor( this );
+ // data transfer starts in XActiveDataControl::start
+}
+
+// ------------------------------------------------------------
+
+Reference< XOutputStream > Pump::getOutputStream() throw()
+{
+ Guard< Mutex > aGuard( m_aMutex );
+ return m_xOutput;
+}
+
+
+// XServiceInfo
+OUString Pump::getImplementationName() throw( )
+{
+ return OPumpImpl_getImplementationName();
+}
+
+// XServiceInfo
+sal_Bool Pump::supportsService(const OUString& ServiceName) throw( )
+{
+ Sequence< OUString > aSNL = getSupportedServiceNames();
+ const OUString * pArray = aSNL.getConstArray();
+
+ for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
+ if( pArray[i] == ServiceName )
+ return sal_True;
+
+ return sal_False;
+}
+
+// XServiceInfo
+Sequence< OUString > Pump::getSupportedServiceNames(void) throw( )
+{
+ return OPumpImpl_getSupportedServiceNames();
+}
+
+
+Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & ) throw (Exception)
+{
+ return Reference< XInterface >( *new Pump );
+}
+
+OUString OPumpImpl_getImplementationName()
+{
+ return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") );
+}
+
+Sequence<OUString> OPumpImpl_getSupportedServiceNames(void)
+{
+ OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) );
+ Sequence< OUString > seq( &s , 1 );
+ return seq;
+}
+
+}
+