1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 package com.sun.star.lib.uno.environments.remote;
25 
26 import com.sun.star.lib.uno.typedesc.MethodDescription;
27 import com.sun.star.lib.uno.typedesc.TypeDescription;
28 import complexlib.ComplexTestCase;
29 
30 public class ThreadPool_Test extends ComplexTestCase {
getTestObjectName()31     public String getTestObjectName() {
32         return getClass().getName();
33     }
34 
getTestMethodNames()35     public String[] getTestMethodNames() {
36         return new String[] { "testDispose",
37                               "testThreadAsync",
38                               "testDynamicThreadSync",
39                               "testStaticThreadSync",
40                               "testDynamicThreadAsyncSyncOrder",
41                               "testStaticThreadAsyncSyncOrder",
42                               "testStress",
43                               "testAsyncSync" };
44     }
45 
testDispose()46     public void testDispose() throws InterruptedException {
47         IThreadPool iThreadPool = ThreadPoolManager.create();
48         TestThread testThread = new TestThread(iThreadPool);
49 
50         ThreadId threadId = null;
51 
52         // start the test thread
53         synchronized(testThread) {
54             testThread.start();
55 
56             testThread.wait();
57 
58             threadId = testThread._threadId;
59 
60             // let the thread attach and enter the threadpool
61             testThread.notifyAll();
62         }
63 
64         String message = "blabla";
65 
66         // terminate the test thread
67         synchronized(testThread) {
68             // put reply job
69             iThreadPool.dispose(new RuntimeException(message));
70 
71             testThread.wait();
72         }
73 
74         testThread.join();
75 
76         assure("", testThread._message.equals(message));
77     }
78 
testThreadAsync()79     public void testThreadAsync() throws InterruptedException {
80         TestWorkAt workAt = new TestWorkAt();
81 
82         ThreadId threadId = ThreadId.createFresh();
83 
84         // queue asyncs
85         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
86             Thread.yield(); // force scheduling
87             putJob(workAt, false, threadId, "increment");
88         }
89 
90         synchronized(workAt) {
91             putJob(workAt, false, threadId, "notifyme");
92 
93             while(!workAt._notified)
94                 workAt.wait();
95         }
96 
97         assure("", workAt._counter == TestWorkAt.MESSAGES);
98     }
99 
testDynamicThreadSync()100     public void testDynamicThreadSync() throws InterruptedException {
101         TestWorkAt workAt = new TestWorkAt();
102 
103         ThreadId threadId = ThreadId.createFresh();
104 
105         // queue asyncs
106         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
107             Thread.yield(); // force scheduling
108             putJob(workAt, true, threadId, "increment");
109         }
110 
111         synchronized(workAt) {
112             putJob(workAt, true, threadId, "notifyme");
113 
114             while(!workAt._notified)
115                 workAt.wait();
116         }
117 
118         assure("", workAt._counter == TestWorkAt.MESSAGES);
119     }
120 
testStaticThreadSync()121     public void testStaticThreadSync() throws InterruptedException {
122         TestWorkAt workAt = new TestWorkAt();
123 
124         TestThread testThread = new TestThread();
125 
126         ThreadId threadId = null;
127 
128         // start the test thread
129         synchronized(testThread) {
130             testThread.start();
131 
132             testThread.wait();
133 
134             threadId = testThread._threadId;
135 
136             // let the thread attach and enter the threadpool
137             testThread.notifyAll();
138         }
139 
140         // queue syncs
141         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
142             Thread.yield(); // force scheduling
143             putJob(workAt, true, threadId, "increment");
144         }
145 
146         // terminate the test thread
147         synchronized(testThread) {
148             // put reply job
149             putJob(workAt, true, threadId, null);
150 
151             testThread.wait();
152         }
153 
154         testThread.join();
155 
156         assure("", workAt._counter == TestWorkAt.MESSAGES);
157     }
158 
testDynamicThreadAsyncSyncOrder()159     public void testDynamicThreadAsyncSyncOrder() throws InterruptedException {
160         TestWorkAt workAt = new TestWorkAt();
161 
162         ThreadId threadId = ThreadId.createFresh();
163 
164         // queue asyncs
165         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
166             Thread.yield(); // force scheduling
167             putJob(workAt, false, threadId, "asyncCall");
168         }
169 
170         // queue syncs
171         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
172             Thread.yield(); // force scheduling
173             putJob(workAt, true, threadId, "syncCall");
174         }
175 
176         synchronized(workAt) {
177             putJob(workAt, true, threadId, "notifyme");
178 
179             while(!workAt._notified)
180                 workAt.wait();
181         }
182 
183         assure("", workAt.passedAsyncTest());
184     }
185 
testStaticThreadAsyncSyncOrder()186     public void testStaticThreadAsyncSyncOrder() throws InterruptedException {
187         TestWorkAt workAt = new TestWorkAt();
188 
189         TestThread testThread = new TestThread();
190 
191         // start the test thread
192         synchronized(testThread) {
193             testThread.start();
194 
195             testThread.wait();
196         }
197 
198         ThreadId threadId = testThread._threadId;
199 
200         // queue asyncs
201         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
202             Thread.yield(); // force scheduling
203             putJob(workAt, false, threadId, "asyncCall");
204         }
205 
206         // let the thread attach and enter the threadpool
207         synchronized(testThread) {
208             testThread.notifyAll();
209         }
210 
211         // queue syncs
212         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
213             Thread.yield(); // force scheduling
214             putJob(workAt, true, threadId, "syncCall");
215         }
216 
217         // terminate the test thread
218         synchronized(testThread) {
219             // put reply job
220             putJob(workAt, true, threadId, null);
221 
222             testThread.wait();
223         }
224 
225         testThread.join();
226 
227         assure("", workAt.passedAsyncTest());
228     }
229 
testStress()230     public void testStress() throws InterruptedException {
231         TestWorkAt workAt = new TestWorkAt();
232         for (int i = 0; i < TestWorkAt.MESSAGES; ++i) {
233             Thread.yield(); // force scheduling
234             ThreadId threadID = ThreadId.createFresh();
235             putJob(workAt, true, threadID, "increment");
236             putJob(workAt, false, threadID, "increment");
237         }
238         synchronized (workAt) {
239             while (workAt._counter < 2 * TestWorkAt.MESSAGES) {
240                 workAt.wait();
241             }
242         }
243 
244         abstract class Stress extends Thread {
245             public Stress(int count) {
246                 this.count = count;
247             }
248 
249             public void run() {
250                 try {
251                     for (int i = 0; i < count; ++i) {
252                         runTest();
253                     }
254                 } catch (Throwable e) {
255                     e.printStackTrace(System.err);
256                 }
257             }
258 
259             protected abstract void runTest() throws InterruptedException;
260 
261             private final int count;
262         }
263 
264         Stress stress1 = new Stress(50) {
265                 protected void runTest() throws InterruptedException {
266                     testThreadAsync();
267                 }
268             };
269         stress1.start();
270 
271         Stress stress2 = new Stress(50) {
272                 protected void runTest() throws InterruptedException {
273                     testDynamicThreadSync();
274                 }
275             };
276         stress2.start();
277 
278         Stress stress3 = new Stress(50) {
279                 protected void runTest() throws InterruptedException {
280                     testStaticThreadSync();
281                 }
282             };
283         stress3.start();
284 
285         Stress stress4 = new Stress(50) {
286                 protected void runTest() throws InterruptedException {
287                     testDynamicThreadAsyncSyncOrder();
288                 }
289             };
290         stress4.start();
291 
292         Stress stress5 = new Stress(50) {
293                 protected void runTest() throws InterruptedException {
294                     testStaticThreadAsyncSyncOrder();
295                 }
296             };
297         stress5.start();
298 
299         Stress stress6 = new Stress(500) {
300                 protected void runTest() throws InterruptedException {
301                     testDispose();
302                 }
303             };
304         stress6.start();
305 
306         stress1.join();
307         stress2.join();
308         stress3.join();
309         stress4.join();
310         stress5.join();
311         stress6.join();
312     }
313 
testAsyncSync()314     public void testAsyncSync() throws InterruptedException {
315         TestWorkAt workAt = new TestWorkAt();
316         ThreadId threadId = ThreadId.createFresh();
317         MyWorkAt myWorkAt = new MyWorkAt( workAt );
318 
319         // queue asyncs
320         for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
321             if( i == 2 )
322             {
323                 putJob( myWorkAt, false , threadId, "asyncCall" );
324             }
325             putJob(workAt, false, threadId, "asyncCall");
326         }
327 
328         synchronized(workAt) {
329             putJob(workAt, false, threadId, "notifyme");
330 
331             while(!workAt._notified)
332                 workAt.wait();
333         }
334 
335         assure("",
336                workAt._async_counter == TestWorkAt.MESSAGES
337                && myWorkAt._success);
338     }
339 
putJob(TestIWorkAt iWorkAt, boolean synchron, ThreadId threadId, String operation)340     private static void putJob(TestIWorkAt iWorkAt, boolean synchron,
341                                ThreadId threadId, String operation) {
342         __iThreadPool.putJob(
343             new Job(iWorkAt, __iReceiver,
344                     new Message(
345                         threadId, operation != null, "oid", __workAt_td,
346                         (operation == null
347                          ? null
348                          : ((MethodDescription)
349                             __workAt_td.getMethodDescription(operation))),
350                         synchron, null, false, null, null)));
351     }
352 
353     private static final class TestThread extends Thread {
354         ThreadId _threadId;
355         Object _disposeId = new Object();
356         String _message;
357         IThreadPool _iThreadPool;
358 
TestThread()359         TestThread() {
360             this(__iThreadPool);
361         }
362 
TestThread(IThreadPool iThreadPool)363         TestThread(IThreadPool iThreadPool) {
364             _iThreadPool = iThreadPool;
365         }
366 
run()367         public void run() {
368             _threadId = _iThreadPool.getThreadId();
369 
370 
371             try {
372                 synchronized(this) {
373                     // notify that we are running
374                     notify();
375 
376                     _iThreadPool.attach();
377 
378                     // wait until we should continue
379                     wait();
380                 }
381 
382                 _iThreadPool.enter();
383             }
384             catch(Throwable throwable) {
385                 _message = throwable.getMessage();
386             }
387 
388             _iThreadPool.detach();
389 
390             synchronized(this) {
391                 // notify the listeners that we are dying
392                 notifyAll();
393             }
394         }
395     }
396 
397     private static final class MyWorkAt implements TestIWorkAt {
MyWorkAt( TestWorkAt async_WorkAt )398         public MyWorkAt( TestWorkAt async_WorkAt ) {
399             _async_WorkAt = async_WorkAt;
400         }
401 
syncCall()402         public void syncCall() throws Throwable
403         {
404             Message iMessage = new Message(
405                 __iThreadPool.getThreadId(), false, "oid", __workAt_td, null,
406                 false, null, false, null, null);
407 
408             // marshal reply
409             ThreadPool_Test.__iThreadPool.putJob(
410                 new Job(this, ThreadPool_Test. __iReceiver, iMessage));
411         }
412 
asyncCall()413         public  void asyncCall() throws Throwable {
414             for (int i = 0 ; i < 5 ; ++i) {
415                 ThreadPool_Test.__iThreadPool.attach();
416                 ThreadPool_Test.putJob(this, true, __iThreadPool.getThreadId(),
417                                        "syncCall");
418                 // wait for reply
419                 ThreadPool_Test.__iThreadPool.enter();
420                 ThreadPool_Test.__iThreadPool.detach();
421             }
422             // async must have waited for this call
423             _success = _async_WorkAt._async_counter == 2;
424         }
425 
increment()426         public void increment() throws Throwable {}
427 
notifyme()428         public void notifyme() {}
429 
430         public boolean _success = false;
431 
432         private final TestWorkAt _async_WorkAt;
433     }
434 
435     private static final IThreadPool __iThreadPool = ThreadPoolManager.create();
436     private static final IReceiver __iReceiver = new TestReceiver();
437     private static final TypeDescription __workAt_td
438     = TypeDescription.getTypeDescription(TestIWorkAt.class);
439 }
440