1 /**************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21
22
23
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_io.hxx"
26
27 // streams
28 #include <com/sun/star/io/XInputStream.hpp>
29 #include <com/sun/star/io/XOutputStream.hpp>
30 #include <com/sun/star/io/XConnectable.hpp>
31
32 #include <com/sun/star/lang/XServiceInfo.hpp>
33
34 #include <cppuhelper/factory.hxx>
35
36 #include <cppuhelper/implbase4.hxx> // OWeakObject
37
38 #include <osl/conditn.hxx>
39 #include <osl/mutex.hxx>
40
41 #include <limits>
42 #include <string.h>
43
44 using namespace ::rtl;
45 using namespace ::osl;
46 using namespace ::cppu;
47 using namespace ::com::sun::star::uno;
48 using namespace ::com::sun::star::io;
49 using namespace ::com::sun::star::lang;
50
51 #include "factreg.hxx"
52 #include "streamhelper.hxx"
53
54 // Implementation and service names
55 #define IMPLEMENTATION_NAME "com.sun.star.comp.io.stm.Pipe"
56 #define SERVICE_NAME "com.sun.star.io.Pipe"
57
58 namespace io_stm{
59
60 class OPipeImpl :
61 public WeakImplHelper4< XInputStream , XOutputStream , XConnectable , XServiceInfo >
62 {
63 public:
64 OPipeImpl( );
65 ~OPipeImpl();
66
67 public: // XInputStream
68 virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
69 throw( NotConnectedException,
70 BufferSizeExceededException,
71 RuntimeException );
72 virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
73 throw( NotConnectedException,
74 BufferSizeExceededException,
75 RuntimeException );
76 virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip)
77 throw( NotConnectedException,
78 BufferSizeExceededException,
79 RuntimeException );
80 virtual sal_Int32 SAL_CALL available(void)
81 throw( NotConnectedException,
82 RuntimeException );
83 virtual void SAL_CALL closeInput(void)
84 throw( NotConnectedException,
85 RuntimeException );
86
87 public: // XOutputStream
88
89 virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData)
90 throw( NotConnectedException,
91 BufferSizeExceededException,
92 RuntimeException );
93 virtual void SAL_CALL flush(void)
94 throw( NotConnectedException,
95 BufferSizeExceededException,
96 RuntimeException );
97 virtual void SAL_CALL closeOutput(void)
98 throw( NotConnectedException,
99 BufferSizeExceededException,
100 RuntimeException );
101
102 public: // XConnectable
103 virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor)
104 throw( RuntimeException );
105 virtual Reference< XConnectable > SAL_CALL getPredecessor(void) throw( RuntimeException );
106 virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor)
107 throw( RuntimeException );
108 virtual Reference < XConnectable > SAL_CALL getSuccessor(void) throw( RuntimeException ) ;
109
110
111 public: // XServiceInfo
112 OUString SAL_CALL getImplementationName() throw( );
113 Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( );
114 sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( );
115
116 private:
117
118 // DEBUG
119 inline void checkInvariant();
120
121 Reference < XConnectable > m_succ;
122 Reference < XConnectable > m_pred;
123
124 sal_Int32 m_nBytesToSkip;
125
126 sal_Int8 *m_p;
127
128 sal_Bool m_bOutputStreamClosed;
129 sal_Bool m_bInputStreamClosed;
130
131 oslCondition m_conditionBytesAvail;
132 Mutex m_mutexAccess;
133 IFIFO *m_pFIFO;
134 };
135
136
137
OPipeImpl()138 OPipeImpl::OPipeImpl()
139 {
140 g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
141 m_nBytesToSkip = 0;
142
143 m_bOutputStreamClosed = sal_False;
144 m_bInputStreamClosed = sal_False;
145
146 m_pFIFO = new MemFIFO;
147 m_conditionBytesAvail = osl_createCondition();
148 }
149
~OPipeImpl()150 OPipeImpl::~OPipeImpl()
151 {
152 osl_destroyCondition( m_conditionBytesAvail );
153 delete m_pFIFO;
154 g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
155 }
156
157
158 // These invariants must hold when entering a guarded method or leaving a guarded method.
checkInvariant()159 void OPipeImpl::checkInvariant()
160 {
161
162 }
163
readBytes(Sequence<sal_Int8> & aData,sal_Int32 nBytesToRead)164 sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
165 throw( NotConnectedException, BufferSizeExceededException,RuntimeException )
166 {
167 while( sal_True )
168 {
169 { // start guarded section
170 MutexGuard guard( m_mutexAccess );
171 if( m_bInputStreamClosed )
172 {
173 throw NotConnectedException(
174 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readBytes NotConnectedException" )),
175 *this );
176 }
177 sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize();
178
179 if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
180 {
181 nBytesToRead = nOccupiedBufferLen;
182 }
183
184 if( nOccupiedBufferLen < nBytesToRead )
185 {
186 // wait outside guarded section
187 osl_resetCondition( m_conditionBytesAvail );
188 }
189 else {
190 // necessary bytes are available
191 m_pFIFO->read( aData , nBytesToRead );
192 return nBytesToRead;
193 }
194 } // end guarded section
195
196 // wait for new data outside guarded section!
197 osl_waitCondition( m_conditionBytesAvail , 0 );
198 }
199 }
200
201
readSomeBytes(Sequence<sal_Int8> & aData,sal_Int32 nMaxBytesToRead)202 sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
203 throw( NotConnectedException,
204 BufferSizeExceededException,
205 RuntimeException )
206 {
207 while( sal_True ) {
208 {
209 MutexGuard guard( m_mutexAccess );
210 if( m_bInputStreamClosed )
211 {
212 throw NotConnectedException(
213 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readSomeBytes NotConnectedException" )),
214 *this );
215 }
216 if( m_pFIFO->getSize() )
217 {
218 sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() );
219 aData.realloc( nSize );
220 m_pFIFO->read( aData , nSize );
221 return nSize;
222 }
223
224 if( m_bOutputStreamClosed )
225 {
226 // no bytes in buffer anymore
227 return 0;
228 }
229 }
230
231 osl_waitCondition( m_conditionBytesAvail , 0 );
232 }
233 }
234
235
skipBytes(sal_Int32 nBytesToSkip)236 void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
237 throw( NotConnectedException,
238 BufferSizeExceededException,
239 RuntimeException )
240 {
241 MutexGuard guard( m_mutexAccess );
242 if( m_bInputStreamClosed )
243 {
244 throw NotConnectedException(
245 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes NotConnectedException" ) ),
246 *this );
247 }
248
249 if( nBytesToSkip < 0
250 || (nBytesToSkip
251 > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
252 {
253 throw BufferSizeExceededException(
254 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes BufferSizeExceededException" )),
255 *this );
256 }
257 m_nBytesToSkip += nBytesToSkip;
258
259 nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip );
260 m_pFIFO->skip( nBytesToSkip );
261 m_nBytesToSkip -= nBytesToSkip;
262 }
263
264
available(void)265 sal_Int32 OPipeImpl::available(void)
266 throw( NotConnectedException,
267 RuntimeException )
268 {
269 MutexGuard guard( m_mutexAccess );
270 if( m_bInputStreamClosed )
271 {
272 throw NotConnectedException(
273 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::available NotConnectedException" ) ),
274 *this );
275 }
276 checkInvariant();
277 return m_pFIFO->getSize();
278 }
279
closeInput(void)280 void OPipeImpl::closeInput(void)
281 throw( NotConnectedException,
282 RuntimeException)
283 {
284 MutexGuard guard( m_mutexAccess );
285
286 m_bInputStreamClosed = sal_True;
287
288 delete m_pFIFO;
289 m_pFIFO = 0;
290
291 // readBytes may throw an exception
292 osl_setCondition( m_conditionBytesAvail );
293
294 setSuccessor( Reference< XConnectable > () );
295 return;
296 }
297
298
writeBytes(const Sequence<sal_Int8> & aData)299 void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
300 throw( NotConnectedException,
301 BufferSizeExceededException,
302 RuntimeException)
303 {
304 MutexGuard guard( m_mutexAccess );
305 checkInvariant();
306
307 if( m_bOutputStreamClosed )
308 {
309 throw NotConnectedException(
310 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (outputstream)" )),
311 *this );
312 }
313
314 if( m_bInputStreamClosed )
315 {
316 throw NotConnectedException(
317 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (inputstream)" )),
318 *this );
319 }
320
321 // check skipping
322 sal_Int32 nLen = aData.getLength();
323 if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) {
324 // all must be skipped - forget whole call
325 m_nBytesToSkip -= nLen;
326 return;
327 }
328
329 // adjust buffersize if necessary
330
331 try
332 {
333 if( m_nBytesToSkip )
334 {
335 Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
336 memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
337 m_pFIFO->write( seqCopy );
338 }
339 else
340 {
341 m_pFIFO->write( aData );
342 }
343 m_nBytesToSkip = 0;
344 }
345 catch ( IFIFO_OutOfBoundsException & )
346 {
347 throw BufferSizeExceededException(
348 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
349 *this );
350 }
351 catch ( IFIFO_OutOfMemoryException & )
352 {
353 throw BufferSizeExceededException(
354 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
355 *this );
356 }
357
358 // readBytes may check again if enough bytes are available
359 osl_setCondition( m_conditionBytesAvail );
360
361 checkInvariant();
362 }
363
364
flush(void)365 void OPipeImpl::flush(void)
366 throw( NotConnectedException,
367 BufferSizeExceededException,
368 RuntimeException)
369 {
370 // nothing to do for a pipe
371 return;
372 }
373
closeOutput(void)374 void OPipeImpl::closeOutput(void)
375 throw( NotConnectedException,
376 BufferSizeExceededException,
377 RuntimeException)
378 {
379 MutexGuard guard( m_mutexAccess );
380
381 m_bOutputStreamClosed = sal_True;
382 osl_setCondition( m_conditionBytesAvail );
383 setPredecessor( Reference < XConnectable > () );
384 return;
385 }
386
387
setSuccessor(const Reference<XConnectable> & r)388 void OPipeImpl::setSuccessor( const Reference < XConnectable > &r )
389 throw( RuntimeException )
390 {
391 /// if the references match, nothing needs to be done
392 if( m_succ != r ) {
393 /// store the reference for later use
394 m_succ = r;
395
396 if( m_succ.is() )
397 {
398 m_succ->setPredecessor(
399 Reference< XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
400 }
401 }
402 }
403
getSuccessor()404 Reference < XConnectable > OPipeImpl::getSuccessor() throw( RuntimeException )
405 {
406 return m_succ;
407 }
408
409
410 // XDataSource
setPredecessor(const Reference<XConnectable> & r)411 void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
412 throw( RuntimeException )
413 {
414 if( r != m_pred ) {
415 m_pred = r;
416 if( m_pred.is() ) {
417 m_pred->setSuccessor(
418 Reference < XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
419 }
420 }
421 }
422
getPredecessor()423 Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException )
424 {
425 return m_pred;
426 }
427
428
429
430
431 // XServiceInfo
getImplementationName()432 OUString OPipeImpl::getImplementationName() throw( )
433 {
434 return OPipeImpl_getImplementationName();
435 }
436
437 // XServiceInfo
supportsService(const OUString & ServiceName)438 sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw( )
439 {
440 Sequence< OUString > aSNL = getSupportedServiceNames();
441 const OUString * pArray = aSNL.getConstArray();
442
443 for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
444 if( pArray[i] == ServiceName )
445 return sal_True;
446
447 return sal_False;
448 }
449
450 // XServiceInfo
getSupportedServiceNames(void)451 Sequence< OUString > OPipeImpl::getSupportedServiceNames(void) throw( )
452 {
453 return OPipeImpl_getSupportedServiceNames();
454 }
455
456
457
458
459
460 /* implementation functions
461 *
462 *
463 */
464
465
OPipeImpl_CreateInstance(const Reference<XComponentContext> &)466 Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance(
467 const Reference < XComponentContext > & ) throw(Exception)
468 {
469 OPipeImpl *p = new OPipeImpl;
470
471 return Reference < XInterface > ( SAL_STATIC_CAST( OWeakObject * , p ) );
472 }
473
474
OPipeImpl_getImplementationName()475 OUString OPipeImpl_getImplementationName()
476 {
477 return OUString( RTL_CONSTASCII_USTRINGPARAM ( IMPLEMENTATION_NAME ) );
478 }
479
OPipeImpl_getSupportedServiceNames(void)480 Sequence<OUString> OPipeImpl_getSupportedServiceNames(void)
481 {
482 Sequence<OUString> aRet(1);
483 aRet.getArray()[0] = OUString( RTL_CONSTASCII_USTRINGPARAM( SERVICE_NAME ));
484 return aRet;
485 }
486 }
487
488
489