xref: /aoo41x/main/io/source/stm/opipe.cxx (revision 3716f815)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_io.hxx"
26 
27 // streams
28 #include <com/sun/star/io/XInputStream.hpp>
29 #include <com/sun/star/io/XOutputStream.hpp>
30 #include <com/sun/star/io/XConnectable.hpp>
31 
32 #include <com/sun/star/lang/XServiceInfo.hpp>
33 
34 #include <cppuhelper/factory.hxx>
35 
36 #include <cppuhelper/implbase4.hxx>      // OWeakObject
37 
38 #include <osl/conditn.hxx>
39 #include <osl/mutex.hxx>
40 
41 #include <limits>
42 #include <string.h>
43 
44 using namespace ::rtl;
45 using namespace ::osl;
46 using namespace ::cppu;
47 using namespace ::com::sun::star::uno;
48 using namespace ::com::sun::star::io;
49 using namespace ::com::sun::star::lang;
50 
51 #include "factreg.hxx"
52 #include "streamhelper.hxx"
53 
54 // Implementation and service names
55 #define IMPLEMENTATION_NAME	"com.sun.star.comp.io.stm.Pipe"
56 #define SERVICE_NAME "com.sun.star.io.Pipe"
57 
58 namespace io_stm{
59 
60 class OPipeImpl :
61 	public WeakImplHelper4< XInputStream , XOutputStream , XConnectable , XServiceInfo >
62 {
63 public:
64 	OPipeImpl( );
65 	~OPipeImpl();
66 
67 public: // XInputStream
68     virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
69 		throw(	NotConnectedException,
70 				BufferSizeExceededException,
71 				RuntimeException );
72     virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
73 		throw( NotConnectedException,
74 			   BufferSizeExceededException,
75 			   RuntimeException );
76     virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip)
77 		throw( NotConnectedException,
78 			   BufferSizeExceededException,
79 			   RuntimeException );
80     virtual sal_Int32 SAL_CALL available(void)
81 		throw( NotConnectedException,
82 			   RuntimeException );
83     virtual void SAL_CALL closeInput(void)
84 		throw( NotConnectedException,
85 			   RuntimeException );
86 
87 public: // XOutputStream
88 
89     virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData)
90 		throw( NotConnectedException,
91 			   BufferSizeExceededException,
92 			   RuntimeException );
93     virtual void SAL_CALL flush(void)
94 		throw( NotConnectedException,
95 			   BufferSizeExceededException,
96 			   RuntimeException );
97     virtual void SAL_CALL closeOutput(void)
98 		throw( NotConnectedException,
99 			   BufferSizeExceededException,
100 			   RuntimeException );
101 
102 public: // XConnectable
103     virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor)
104 		throw( RuntimeException );
105     virtual Reference< XConnectable > SAL_CALL getPredecessor(void) throw( RuntimeException );
106     virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor)
107 		throw( RuntimeException );
108     virtual Reference < XConnectable > SAL_CALL getSuccessor(void) throw( RuntimeException ) ;
109 
110 
111 public: // XServiceInfo
112     OUString                    SAL_CALL getImplementationName() throw(  );
113     Sequence< OUString >         SAL_CALL getSupportedServiceNames(void) throw(  );
114     sal_Bool                        SAL_CALL supportsService(const OUString& ServiceName) throw(  );
115 
116 private:
117 
118 	// DEBUG
119 	inline void checkInvariant();
120 
121 	Reference < XConnectable > 	m_succ;
122 	Reference < XConnectable > 	m_pred;
123 
124 	sal_Int32 m_nBytesToSkip;
125 
126 	sal_Int8  *m_p;
127 
128 	sal_Bool m_bOutputStreamClosed;
129 	sal_Bool m_bInputStreamClosed;
130 
131 	oslCondition m_conditionBytesAvail;
132 	Mutex     m_mutexAccess;
133 	IFIFO		*m_pFIFO;
134 };
135 
136 
137 
OPipeImpl()138 OPipeImpl::OPipeImpl()
139 {
140 	g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
141 	m_nBytesToSkip  = 0;
142 
143 	m_bOutputStreamClosed 	= sal_False;
144 	m_bInputStreamClosed 	= sal_False;
145 
146 	m_pFIFO = new MemFIFO;
147 	m_conditionBytesAvail = osl_createCondition();
148 }
149 
~OPipeImpl()150 OPipeImpl::~OPipeImpl()
151 {
152 	osl_destroyCondition( m_conditionBytesAvail );
153 	delete m_pFIFO;
154 	g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
155 }
156 
157 
158 // These invariants must hold when entering a guarded method or leaving a guarded method.
checkInvariant()159 void OPipeImpl::checkInvariant()
160 {
161 
162 }
163 
readBytes(Sequence<sal_Int8> & aData,sal_Int32 nBytesToRead)164 sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
165 	throw( NotConnectedException, BufferSizeExceededException,RuntimeException )
166 {
167 	while( sal_True )
168 	{
169 		{ // start guarded section
170 			MutexGuard guard( m_mutexAccess );
171 			if( m_bInputStreamClosed )
172 			{
173 				throw NotConnectedException(
174                     OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readBytes NotConnectedException" )),
175                     *this );
176 			}
177 			sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize();
178 
179 			if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
180 			{
181 				nBytesToRead = nOccupiedBufferLen;
182 			}
183 
184 			if( nOccupiedBufferLen < nBytesToRead )
185 			{
186 				// wait outside guarded section
187 				osl_resetCondition( m_conditionBytesAvail );
188 			}
189 			else {
190 				// necessary bytes are available
191 				m_pFIFO->read( aData , nBytesToRead );
192 				return nBytesToRead;
193 			}
194 		} // end guarded section
195 
196 		// wait for new data outside guarded section!
197 		osl_waitCondition( m_conditionBytesAvail , 0 );
198 	}
199 }
200 
201 
readSomeBytes(Sequence<sal_Int8> & aData,sal_Int32 nMaxBytesToRead)202 sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
203 	throw( NotConnectedException,
204 		   BufferSizeExceededException,
205 		   RuntimeException )
206 {
207 	while( sal_True ) {
208 		{
209 			MutexGuard guard( m_mutexAccess );
210 			if( m_bInputStreamClosed )
211 			{
212 				throw NotConnectedException(
213                     OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readSomeBytes NotConnectedException" )),
214                     *this );
215 			}
216 			if( m_pFIFO->getSize() )
217 			{
218 				sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() );
219 				aData.realloc( nSize );
220 				m_pFIFO->read( aData , nSize );
221 				return nSize;
222 			}
223 
224 			if( m_bOutputStreamClosed )
225 			{
226 				// no bytes in buffer anymore
227 				return 0;
228 			}
229 		}
230 
231 		osl_waitCondition( m_conditionBytesAvail , 0 );
232 	}
233 }
234 
235 
skipBytes(sal_Int32 nBytesToSkip)236 void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
237 	throw( NotConnectedException,
238 		   BufferSizeExceededException,
239 		   RuntimeException )
240 {
241 	MutexGuard guard( m_mutexAccess );
242     if( m_bInputStreamClosed )
243     {
244         throw NotConnectedException(
245             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes NotConnectedException" ) ),
246             *this );
247     }
248 
249 	if( nBytesToSkip < 0
250         || (nBytesToSkip
251             > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
252 	{
253 		throw BufferSizeExceededException(
254             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes BufferSizeExceededException" )),
255             *this );
256 	}
257 	m_nBytesToSkip += nBytesToSkip;
258 
259 	nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip );
260 	m_pFIFO->skip( nBytesToSkip );
261 	m_nBytesToSkip -= nBytesToSkip;
262 }
263 
264 
available(void)265 sal_Int32 OPipeImpl::available(void)
266 	throw( NotConnectedException,
267 		   RuntimeException )
268  {
269 	MutexGuard guard( m_mutexAccess );
270     if( m_bInputStreamClosed )
271     {
272         throw NotConnectedException(
273             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::available NotConnectedException" ) ),
274             *this );
275     }
276 	checkInvariant();
277 	return m_pFIFO->getSize();
278 }
279 
closeInput(void)280 void OPipeImpl::closeInput(void)
281 	throw( NotConnectedException,
282 		   RuntimeException)
283 {
284 	MutexGuard guard( m_mutexAccess );
285 
286 	m_bInputStreamClosed = sal_True;
287 
288 	delete m_pFIFO;
289 	m_pFIFO = 0;
290 
291 	// readBytes may throw an exception
292 	osl_setCondition( m_conditionBytesAvail );
293 
294 	setSuccessor( Reference< XConnectable > () );
295 	return;
296 }
297 
298 
writeBytes(const Sequence<sal_Int8> & aData)299 void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
300 	throw( NotConnectedException,
301 		   BufferSizeExceededException,
302 		   RuntimeException)
303 {
304 	MutexGuard guard( m_mutexAccess );
305 	checkInvariant();
306 
307 	if( m_bOutputStreamClosed )
308 	{
309         throw NotConnectedException(
310             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (outputstream)" )),
311             *this );
312 	}
313 
314 	if( m_bInputStreamClosed )
315 	{
316         throw NotConnectedException(
317             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (inputstream)" )),
318             *this );
319 	}
320 
321 	// check skipping
322 	sal_Int32 nLen = aData.getLength();
323 	if( m_nBytesToSkip  && m_nBytesToSkip >= nLen  ) {
324 		// all must be skipped - forget whole call
325 		m_nBytesToSkip -= nLen;
326 		return;
327 	}
328 
329 	// adjust buffersize if necessary
330 
331 	try
332 	{
333 		if( m_nBytesToSkip )
334 		{
335 			Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
336 			memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
337 			m_pFIFO->write( seqCopy );
338 		}
339 		else
340 		{
341 			m_pFIFO->write( aData );
342 		}
343 		m_nBytesToSkip = 0;
344 	}
345 	catch ( IFIFO_OutOfBoundsException & )
346 	{
347 		throw BufferSizeExceededException(
348             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
349             *this );
350 	}
351 	catch ( IFIFO_OutOfMemoryException & )
352 	{
353 		throw BufferSizeExceededException(
354             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
355             *this );
356 	}
357 
358 	// readBytes may check again if enough bytes are available
359 	osl_setCondition( m_conditionBytesAvail );
360 
361 	checkInvariant();
362 }
363 
364 
flush(void)365 void OPipeImpl::flush(void)
366 	throw( NotConnectedException,
367 		   BufferSizeExceededException,
368 		   RuntimeException)
369 {
370 	// nothing to do for a pipe
371 	return;
372 }
373 
closeOutput(void)374 void OPipeImpl::closeOutput(void)
375 	throw( NotConnectedException,
376 		   BufferSizeExceededException,
377 		   RuntimeException)
378 {
379 	MutexGuard guard( m_mutexAccess );
380 
381 	m_bOutputStreamClosed = sal_True;
382 	osl_setCondition( m_conditionBytesAvail );
383 	setPredecessor( Reference < XConnectable > () );
384 	return;
385 }
386 
387 
setSuccessor(const Reference<XConnectable> & r)388 void OPipeImpl::setSuccessor( const Reference < XConnectable >  &r )
389 	throw( RuntimeException )
390 {
391      /// if the references match, nothing needs to be done
392      if( m_succ != r ) {
393          /// store the reference for later use
394          m_succ = r;
395 
396          if( m_succ.is() )
397 		 {
398               m_succ->setPredecessor(
399 				  Reference< XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
400          }
401      }
402 }
403 
getSuccessor()404 Reference < XConnectable > OPipeImpl::getSuccessor() 	throw( RuntimeException )
405 {
406 	return m_succ;
407 }
408 
409 
410 // XDataSource
setPredecessor(const Reference<XConnectable> & r)411 void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
412 	throw( RuntimeException )
413 {
414 	if( r != m_pred ) {
415 		m_pred = r;
416 		if( m_pred.is() ) {
417 			m_pred->setSuccessor(
418 				Reference < XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
419 		}
420 	}
421 }
422 
getPredecessor()423 Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException )
424 {
425 	return m_pred;
426 }
427 
428 
429 
430 
431 // XServiceInfo
getImplementationName()432 OUString OPipeImpl::getImplementationName() throw(  )
433 {
434     return OPipeImpl_getImplementationName();
435 }
436 
437 // XServiceInfo
supportsService(const OUString & ServiceName)438 sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw(  )
439 {
440     Sequence< OUString > aSNL = getSupportedServiceNames();
441     const OUString * pArray = aSNL.getConstArray();
442 
443     for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
444         if( pArray[i] == ServiceName )
445             return sal_True;
446 
447     return sal_False;
448 }
449 
450 // XServiceInfo
getSupportedServiceNames(void)451 Sequence< OUString > OPipeImpl::getSupportedServiceNames(void) throw(  )
452 {
453 	return OPipeImpl_getSupportedServiceNames();
454 }
455 
456 
457 
458 
459 
460 /* implementation functions
461 *
462 *
463 */
464 
465 
OPipeImpl_CreateInstance(const Reference<XComponentContext> &)466 Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance(
467 	const Reference < XComponentContext > & ) throw(Exception)
468 {
469 	OPipeImpl *p = new OPipeImpl;
470 
471 	return Reference < XInterface > ( SAL_STATIC_CAST( OWeakObject * , p ) );
472 }
473 
474 
OPipeImpl_getImplementationName()475 OUString 	OPipeImpl_getImplementationName()
476 {
477 	return OUString( RTL_CONSTASCII_USTRINGPARAM ( IMPLEMENTATION_NAME ) );
478 }
479 
OPipeImpl_getSupportedServiceNames(void)480 Sequence<OUString> OPipeImpl_getSupportedServiceNames(void)
481 {
482 	Sequence<OUString> aRet(1);
483 	aRet.getArray()[0] = OUString( RTL_CONSTASCII_USTRINGPARAM( SERVICE_NAME ));
484 	return aRet;
485 }
486 }
487 
488 
489