xref: /trunk/main/io/source/stm/opump.cxx (revision 1ecadb572e7010ff3b3382ad9bf179dbc6efadbb)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_io.hxx"
30 
31 #include <stdio.h>
32 
33 #include <osl/diagnose.h>
34 
35 #include <com/sun/star/io/XActiveDataSource.hpp>
36 #include <com/sun/star/io/XActiveDataSink.hpp>
37 #include <com/sun/star/io/XActiveDataControl.hpp>
38 #include <com/sun/star/io/XConnectable.hpp>
39 #include <com/sun/star/lang/XSingleServiceFactory.hpp>
40 #include <com/sun/star/lang/XMultiServiceFactory.hpp>
41 #include <com/sun/star/lang/XServiceInfo.hpp>
42 #include <com/sun/star/registry/XRegistryKey.hpp>
43 
44 #include <uno/dispatcher.h>
45 #include <uno/mapping.hxx>
46 #include <cppuhelper/implbase5.hxx>
47 #include <cppuhelper/factory.hxx>
48 #include <cppuhelper/interfacecontainer.hxx>
49 #include <osl/mutex.hxx>
50 #include <osl/thread.h>
51 
52 
53 using namespace osl;
54 using namespace std;
55 using namespace rtl;
56 using namespace cppu;
57 using namespace com::sun::star::uno;
58 using namespace com::sun::star::lang;
59 using namespace com::sun::star::registry;
60 using namespace com::sun::star::io;
61 
62 #include "factreg.hxx"
63 
64 namespace io_stm {
65 
66     class Pump : public WeakImplHelper5<
67           XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo >
68     {
69         Mutex                                   m_aMutex;
70         oslThread                               m_aThread;
71 
72         Reference< XConnectable >               m_xPred;
73         Reference< XConnectable >               m_xSucc;
74         Reference< XInputStream >               m_xInput;
75         Reference< XOutputStream >              m_xOutput;
76         OInterfaceContainerHelper               m_cnt;
77         sal_Bool                                m_closeFired;
78 
79         void run();
80         static void static_run( void* pObject );
81 
82         void close();
83         void fireClose();
84         void fireStarted();
85         void fireTerminated();
86         void fireError( const Any &a );
87 
88     public:
89         Pump();
90         virtual ~Pump();
91 
92         // XActiveDataSource
93         virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw();
94         virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw();
95 
96         // XActiveDataSink
97         virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw();
98         virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw();
99 
100         // XActiveDataControl
101         virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
102         virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw();
103         virtual void SAL_CALL start() throw( RuntimeException );
104         virtual void SAL_CALL terminate() throw();
105 
106         // XConnectable
107         virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw();
108         virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw();
109         virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw();
110         virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw();
111 
112     public: // XServiceInfo
113         virtual OUString    SAL_CALL getImplementationName() throw(  );
114         virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw(  );
115         virtual sal_Bool     SAL_CALL supportsService(const OUString& ServiceName) throw(  );
116     };
117 
118 Pump::Pump() : m_aThread( 0 ),
119                m_cnt( m_aMutex ),
120                m_closeFired( sal_False )
121 {
122     g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
123 }
124 
125 Pump::~Pump()
126 {
127     // exit gracefully
128     if( m_aThread )
129     {
130         osl_joinWithThread( m_aThread );
131         osl_destroyThread( m_aThread );
132     }
133     g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
134 }
135 
136 void Pump::fireError( const  Any & exception )
137 {
138     OInterfaceIteratorHelper iter( m_cnt );
139     while( iter.hasMoreElements() )
140     {
141         try
142         {
143             static_cast< XStreamListener * > ( iter.next() )->error( exception );
144         }
145         catch ( RuntimeException &e )
146         {
147             OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
148             OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
149         }
150     }
151 }
152 
153 void Pump::fireClose()
154 {
155     sal_Bool bFire = sal_False;
156     {
157         MutexGuard guard( m_aMutex );
158         if( ! m_closeFired  )
159         {
160             m_closeFired = sal_True;
161             bFire = sal_True;
162         }
163     }
164 
165     if( bFire )
166     {
167         OInterfaceIteratorHelper iter( m_cnt );
168         while( iter.hasMoreElements() )
169         {
170             try
171             {
172                 static_cast< XStreamListener * > ( iter.next() )->closed( );
173             }
174             catch ( RuntimeException &e )
175             {
176                 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
177                 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
178             }
179         }
180     }
181 }
182 
183 void Pump::fireStarted()
184 {
185     OInterfaceIteratorHelper iter( m_cnt );
186     while( iter.hasMoreElements() )
187     {
188         try
189         {
190             static_cast< XStreamListener * > ( iter.next() )->started( );
191         }
192         catch ( RuntimeException &e )
193         {
194             OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
195             OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
196         }
197     }
198 }
199 
200 void Pump::fireTerminated()
201 {
202     OInterfaceIteratorHelper iter( m_cnt );
203     while( iter.hasMoreElements() )
204     {
205         try
206         {
207             static_cast< XStreamListener * > ( iter.next() )->terminated();
208         }
209         catch ( RuntimeException &e )
210         {
211             OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
212             OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() );
213         }
214     }
215 }
216 
217 
218 
219 void Pump::close()
220 {
221     // close streams and release references
222     Reference< XInputStream > rInput;
223     Reference< XOutputStream > rOutput;
224     {
225         MutexGuard guard( m_aMutex );
226         rInput = m_xInput;
227         m_xInput.clear();
228 
229         rOutput = m_xOutput;
230         m_xOutput.clear();
231         m_xSucc.clear();
232         m_xPred.clear();
233     }
234     if( rInput.is() )
235     {
236         try
237         {
238             rInput->closeInput();
239         }
240         catch( Exception & )
241         {
242             // go down calm
243         }
244     }
245     if( rOutput.is() )
246     {
247         try
248         {
249             rOutput->closeOutput();
250         }
251         catch( Exception & )
252         {
253             // go down calm
254         }
255     }
256 }
257 
258 void Pump::static_run( void* pObject )
259 {
260     ((Pump*)pObject)->run();
261     ((Pump*)pObject)->release();
262 }
263 
264 void Pump::run()
265 {
266     try
267     {
268         fireStarted();
269         try
270         {
271             Reference< XInputStream > rInput;
272             Reference< XOutputStream > rOutput;
273             {
274                 Guard< Mutex > aGuard( m_aMutex );
275                 rInput = m_xInput;
276                 rOutput = m_xOutput;
277             }
278 
279             if( ! rInput.is() )
280             {
281                 NotConnectedException exception(
282                     OUString::createFromAscii( "no input stream set" ) , Reference<XInterface>((OWeakObject*)this) );
283                 throw exception;
284             }
285             Sequence< sal_Int8 > aData;
286             while( rInput->readSomeBytes( aData, 65536 ) )
287             {
288                 if( ! rOutput.is() )
289                 {
290                     NotConnectedException exception(
291                         OUString::createFromAscii( "no output stream set" ) , Reference<XInterface>( (OWeakObject*)this) );
292                     throw exception;
293                 }
294                 rOutput->writeBytes( aData );
295                 osl_yieldThread();
296             }
297         }
298         catch ( IOException & e )
299         {
300             fireError( makeAny( e ) );
301         }
302         catch ( RuntimeException & e )
303         {
304             fireError( makeAny( e ) );
305         }
306         catch ( Exception & e )
307         {
308             fireError( makeAny( e ) );
309         }
310 
311         close();
312         fireClose();
313     }
314     catch ( com::sun::star::uno::Exception &e )
315     {
316         // we are the last on the stack.
317         // this is to avoid crashing the program, when e.g. a bridge crashes
318         OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US );
319         OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() );
320     }
321 }
322 
323 // ------------------------------------------------------------
324 
325 /*
326  * XConnectable
327  */
328 
329 void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw()
330 {
331     Guard< Mutex > aGuard( m_aMutex );
332     m_xPred = xPred;
333 }
334 
335 // ------------------------------------------------------------
336 
337 Reference< XConnectable > Pump::getPredecessor() throw()
338 {
339     Guard< Mutex > aGuard( m_aMutex );
340     return m_xPred;
341 }
342 
343 // ------------------------------------------------------------
344 
345 void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw()
346 {
347     Guard< Mutex > aGuard( m_aMutex );
348     m_xSucc = xSucc;
349 }
350 
351 // ------------------------------------------------------------
352 
353 Reference< XConnectable > Pump::getSuccessor() throw()
354 {
355     Guard< Mutex > aGuard( m_aMutex );
356     return m_xSucc;
357 }
358 
359 // -----------------------------------------------------------------
360 
361 /*
362  * XActiveDataControl
363  */
364 
365 void Pump::addListener( const Reference< XStreamListener >& xListener ) throw()
366 {
367     m_cnt.addInterface( xListener );
368 }
369 
370 // ------------------------------------------------------------
371 
372 void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw()
373 {
374     m_cnt.removeInterface( xListener );
375 }
376 
377 // ------------------------------------------------------------
378 
379 void Pump::start() throw( RuntimeException )
380 {
381     Guard< Mutex > aGuard( m_aMutex );
382     m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this);
383     if( m_aThread )
384     {
385         // will be released by OPump::static_run
386         acquire();
387         osl_resumeThread( m_aThread );
388     }
389     else
390     {
391         throw RuntimeException(
392             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )),
393             *this);
394     }
395 }
396 
397 // ------------------------------------------------------------
398 
399 void Pump::terminate() throw()
400 {
401     close();
402 
403     // wait for the worker to die
404     if( m_aThread )
405         osl_joinWithThread( m_aThread );
406 
407     fireTerminated();
408     fireClose();
409 }
410 
411 // ------------------------------------------------------------
412 
413 /*
414  * XActiveDataSink
415  */
416 
417 void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw()
418 {
419     Guard< Mutex > aGuard( m_aMutex );
420     m_xInput = xStream;
421     Reference< XConnectable > xConnect( xStream, UNO_QUERY );
422     if( xConnect.is() )
423         xConnect->setSuccessor( this );
424     // data transfer starts in XActiveDataControl::start
425 }
426 
427 // ------------------------------------------------------------
428 
429 Reference< XInputStream > Pump::getInputStream() throw()
430 {
431     Guard< Mutex > aGuard( m_aMutex );
432     return m_xInput;
433 }
434 
435 // ------------------------------------------------------------
436 
437 /*
438  * XActiveDataSource
439  */
440 
441 void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw()
442 {
443     Guard< Mutex > aGuard( m_aMutex );
444     m_xOutput = xOut;
445     Reference< XConnectable > xConnect( xOut, UNO_QUERY );
446     if( xConnect.is() )
447         xConnect->setPredecessor( this );
448     // data transfer starts in XActiveDataControl::start
449 }
450 
451 // ------------------------------------------------------------
452 
453 Reference< XOutputStream > Pump::getOutputStream() throw()
454 {
455     Guard< Mutex > aGuard( m_aMutex );
456     return m_xOutput;
457 }
458 
459 
460 // XServiceInfo
461 OUString Pump::getImplementationName() throw(  )
462 {
463     return OPumpImpl_getImplementationName();
464 }
465 
466 // XServiceInfo
467 sal_Bool Pump::supportsService(const OUString& ServiceName) throw(  )
468 {
469     Sequence< OUString > aSNL = getSupportedServiceNames();
470     const OUString * pArray = aSNL.getConstArray();
471 
472     for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
473         if( pArray[i] == ServiceName )
474             return sal_True;
475 
476     return sal_False;
477 }
478 
479 // XServiceInfo
480 Sequence< OUString > Pump::getSupportedServiceNames(void) throw(  )
481 {
482     return OPumpImpl_getSupportedServiceNames();
483 }
484 
485 
486 Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & ) throw (Exception)
487 {
488     return Reference< XInterface >( *new Pump );
489 }
490 
491 OUString OPumpImpl_getImplementationName()
492 {
493     return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") );
494 }
495 
496 Sequence<OUString> OPumpImpl_getSupportedServiceNames(void)
497 {
498     OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) );
499     Sequence< OUString > seq( &s , 1 );
500     return seq;
501 }
502 
503 }
504 
505