1#**************************************************************
2#
3#  Licensed to the Apache Software Foundation (ASF) under one
4#  or more contributor license agreements.  See the NOTICE file
5#  distributed with this work for additional information
6#  regarding copyright ownership.  The ASF licenses this file
7#  to you under the Apache License, Version 2.0 (the
8#  "License"); you may not use this file except in compliance
9#  with the License.  You may obtain a copy of the License at
10#
11#    http://www.apache.org/licenses/LICENSE-2.0
12#
13#  Unless required by applicable law or agreed to in writing,
14#  software distributed under the License is distributed on an
15#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16#  KIND, either express or implied.  See the License for the
17#  specific language governing permissions and limitations
18#  under the License.
19#
20#**************************************************************
21"tests bridging python implementations of UNO objects"
22import unittest
23import uno
24import unohelper
25import os
26import sys
27
28from com.sun.star.io import XOutputStream, XInputStream, typeOfXOutputStream, typeOfXInputStream
29from com.sun.star.lang import XTypeProvider, typeOfXTypeProvider, XEventListener
30from com.sun.star.uno import XCurrentContext
31
32class SequenceOutputStream( unohelper.Base, XOutputStream ):
33      def __init__( self ):
34          self.s = uno.ByteSequence("")
35          self.closed = 0
36
37      def closeOutput(self):
38          self.closed = 1
39
40      def writeBytes( self, seq ):
41          self.s = self.s + seq
42
43      def flush( self ):
44          pass
45
46      def getSequence( self ):
47          return self.s
48
49
50class SequenceInputStream( XInputStream, unohelper.Base ):
51      def __init__( self, seq ):
52          self.s = seq
53          self.nIndex = 0
54          self.closed = 0
55
56      def closeInput( self):
57          self.closed = 1
58          self.s = None
59
60      def skipBytes( self, nByteCount ):
61          if( nByteCount + self.nIndex > len(self.s) ):
62              nByteCount = len(self.s) - self.nIndex
63          self.nIndex += nByteCount
64
65      def readBytes( self, retSeq, nByteCount ):
66          nRet = 0
67          if( self.nIndex + nByteCount > len(self.s) ):
68              nRet = len(self.s) - self.nIndex
69          else:
70              nRet = nByteCount
71          retSeq = uno.ByteSequence(self.s.value[self.nIndex : self.nIndex + nRet ])
72          self.nIndex = self.nIndex + nRet
73          return nRet, retSeq
74
75      def readSomeBytes( self, retSeq , nByteCount ):
76          #as we never block !
77          return readBytes( retSeq, nByteCount )
78
79      def available( self ):
80          return len( self.s ) - self.nIndex
81
82class SequenceInputStream2( SequenceInputStream ):
83      def __init__( self, seq ):
84            SequenceInputStream.__init__( self, seq )
85
86class TestCase(unittest.TestCase):
87      def __init__(self,method,ctx):
88          unittest.TestCase.__init__(self,method)
89          self.ctx = ctx
90
91      def setUp(self):
92          self.tobj = self.ctx.ServiceManager.createInstanceWithContext( \
93                           "com.sun.star.test.bridge.CppTestObject",self.ctx)
94          self.pipe = self.ctx.ServiceManager.createInstanceWithContext( \
95                           "com.sun.star.io.Pipe" , self.ctx )
96
97      def testStandard( self ):
98          dataOut = self.ctx.ServiceManager.createInstanceWithContext( \
99                        "com.sun.star.io.DataOutputStream", self.ctx )
100          streamOut = SequenceOutputStream()
101          dataOut.setOutputStream( streamOut )
102          dataOut.writeShort( 42 )
103          dataOut.writeLong( 43 )
104          dataOut.closeOutput()
105
106          dataInput = self.ctx.ServiceManager.createInstanceWithContext( \
107                   "com.sun.star.io.DataInputStream", self.ctx )
108
109          dataInput.setInputStream( SequenceInputStream2( streamOut.getSequence() ) )
110
111          self.failUnless( 42 == dataInput.readShort() )
112          self.failUnless( 43 == dataInput.readLong() )
113          self.failUnless( self.tobj.transportAny( streamOut ) == streamOut )
114
115
116class NullDevice:
117      def write( self, string ):
118            pass
119
120
121class EventListener( unohelper.Base, XEventListener ):
122    def __init__( self ):
123        self.disposingCalled = False
124
125    def disposing( self , eventObject ):
126        self.disposingCalled = True
127
128class TestHelperCase( unittest.TestCase ):
129
130      def __init__(self,method):
131            unittest.TestCase.__init__(self,method)
132
133      def testUrlHelper( self ):
134            systemPath = os.getcwd()
135            if systemPath.startswith( "/" ):
136                  self.failUnless( "/tmp" == unohelper.fileUrlToSystemPath( "file:///tmp" ) )
137                  self.failUnless( "file:///tmp" == unohelper.systemPathToFileUrl( "/tmp" ))
138            else:
139                  self.failUnless( "c:\\temp" == unohelper.fileUrlToSystemPath( "file:///c:/temp" ) )
140                  self.failUnless( "file:///c:/temp" == unohelper.systemPathToFileUrl( "c:\\temp" ) )
141
142            systemPath = unohelper.systemPathToFileUrl( systemPath )
143            self.failUnless( systemPath + "/a" == unohelper.absolutize( systemPath, "a" ) )
144      def testInspect( self ):
145            dev = NullDevice()
146#            dev = sys.stdout
147            unohelper.inspect( uno.getComponentContext() , dev )
148            unohelper.inspect( uno.getComponentContext().ServiceManager , dev )
149            unohelper.inspect( uno.getTypeByName( "com.sun.star.lang.XComponent" ) , dev )
150
151      def testListener( self ):
152            smgr = uno.getComponentContext().ServiceManager.createInstance(
153                  "com.sun.star.lang.ServiceManager" )
154
155            # check, whether listeners
156            listener = EventListener()
157            smgr.addEventListener( listener )
158            smgr.dispose()
159            self.failUnless( listener.disposingCalled )
160
161            # check, whether listeners can be removed
162            smgr = uno.getComponentContext().ServiceManager.createInstance(
163                  "com.sun.star.lang.ServiceManager" )
164            listener = EventListener()
165            smgr.addEventListener( listener )
166            smgr.removeEventListener( listener )
167            smgr.dispose()
168            self.failUnless( not listener.disposingCalled )
169
170      def testCurrentContext( self ):
171            oldContext = uno.getCurrentContext()
172            try:
173                  uno.setCurrentContext(
174                        unohelper.CurrentContext( oldContext,{"My42":42}) )
175                  self.failUnless( 42 == uno.getCurrentContext().getValueByName( "My42" ) )
176                  self.failUnless( None == uno.getCurrentContext().getValueByName( "My43" ) )
177            finally:
178                  uno.setCurrentContext( oldContext )
179
180
181
182def suite( ctx ):
183    suite = unittest.TestSuite()
184    suite.addTest(TestCase("testStandard",ctx))
185    suite.addTest(TestHelperCase( "testUrlHelper" ))
186    suite.addTest(TestHelperCase( "testInspect" ))
187    suite.addTest(TestHelperCase( "testListener" ) )
188    suite.addTest(TestHelperCase( "testCurrentContext" ) )
189    return suite
190
191