1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 package com.sun.star.lib.uno.environments.remote;
25 
26 import com.sun.star.lib.uno.typedesc.MethodDescription;
27 import com.sun.star.lib.uno.typedesc.TypeDescription;
28 import complexlib.ComplexTestCase;
29 
30 public final class JobQueue_Test extends ComplexTestCase {
getTestObjectName()31     public String getTestObjectName() {
32         return getClass().getName();
33     }
34 
getTestMethodNames()35     public String[] getTestMethodNames() {
36         return new String[] { "testThreadLeavesJobQueueOnDispose0",
37                               "testThreadLeavesJobQueueOnDispose5000",
38                               "testThreadLeavesJobQueueOnReply0",
39                               "testThreadLeavesJobQueueOnReply5000",
40                               "testStaticThreadExecutesJobs0",
41                               "testStaticThreadExecutesJobs5000",
42                               "testDynamicThreadExecutesJob",
43                               "testStaticThreadExecutesAsyncs",
44                               "testDynamicThreadExecutesAsyncs" };
45     }
46 
testThreadLeavesJobQueueOnDispose0()47     public void testThreadLeavesJobQueueOnDispose0() throws InterruptedException
48     {
49         testThreadLeavesJobQueueOnDispose(0);
50     }
51 
testThreadLeavesJobQueueOnDispose5000()52     public void testThreadLeavesJobQueueOnDispose5000()
53         throws InterruptedException
54     {
55         testThreadLeavesJobQueueOnDispose(5000);
56     }
57 
testThreadLeavesJobQueueOnDispose(int waitTime)58     private void testThreadLeavesJobQueueOnDispose(int waitTime)
59         throws InterruptedException
60     {
61         TestThread t = new TestThread(waitTime);
62         t.waitToStart();
63         String msg = "xcxxxxxxxx";
64         t._jobQueue.dispose(t._disposeId, new RuntimeException (msg));
65         t.waitToTerminate();
66         assure("", t._message.equals(msg));
67     }
68 
testThreadLeavesJobQueueOnReply0()69     public void testThreadLeavesJobQueueOnReply0() throws InterruptedException {
70         testThreadLeavesJobQueueOnReply(0);
71     }
72 
testThreadLeavesJobQueueOnReply5000()73     public void testThreadLeavesJobQueueOnReply5000()
74         throws InterruptedException
75     {
76         testThreadLeavesJobQueueOnReply(5000);
77     }
78 
testThreadLeavesJobQueueOnReply(int waitTime)79     private void testThreadLeavesJobQueueOnReply(int waitTime)
80         throws InterruptedException
81     {
82         TestThread t = new TestThread(waitTime);
83         t.waitToStart();
84         // put reply job:
85         t._jobQueue.putJob(
86             new Job(null, __iReceiver,
87                     new Message(
88                         null, false, "oid", __workAt_td, null, false, null,
89                         false, null, null)),
90             null);
91         t.waitToTerminate();
92         assure("", true); // TODO! ???
93     }
94 
testStaticThreadExecutesJobs0()95     public void testStaticThreadExecutesJobs0() throws InterruptedException {
96         testStaticThreadExecutesJobs(0);
97     }
98 
testStaticThreadExecutesJobs5000()99     public void testStaticThreadExecutesJobs5000() throws InterruptedException {
100         testStaticThreadExecutesJobs(5000);
101     }
102 
testStaticThreadExecutesJobs(int waitTime)103     private void testStaticThreadExecutesJobs(int waitTime)
104         throws InterruptedException
105     {
106         TestThread t = new TestThread(waitTime);
107         t.waitToStart();
108         testExecuteJobs(t._jobQueue);
109         t._jobQueue.dispose(t._disposeId,
110                             new RuntimeException("xxxxxxxxxxxxx"));
111         t.waitToTerminate();
112     }
113 
testDynamicThreadExecutesJob()114     public void testDynamicThreadExecutesJob() throws InterruptedException {
115         testExecuteJobs(
116             new JobQueue(
117                 __javaThreadPoolFactory, ThreadId.createFresh(), true));
118     }
119 
testStaticThreadExecutesAsyncs()120     public void testStaticThreadExecutesAsyncs() throws InterruptedException {
121         TestThread t = new TestThread();
122         JobQueue async_jobQueue = new JobQueue(__javaThreadPoolFactory,
123                                                t._threadId);
124         assure("", async_jobQueue._ref_count == 1);
125         t._jobQueue = __javaThreadPoolFactory.getJobQueue(t._threadId);
126         assure("", t._jobQueue._ref_count == 1);
127         t.waitToStart();
128         TestWorkAt workAt = new TestWorkAt();
129         testAsyncJobQueue(workAt, async_jobQueue, t._threadId);
130         t._jobQueue.dispose(t._disposeId,
131                             new RuntimeException("xxxxxxxxxxxxx"));
132         t.waitToTerminate();
133         assure("", workAt._async_counter == TestWorkAt.MESSAGES);
134         assure("", workAt._sync_counter == TestWorkAt.MESSAGES);
135     }
136 
testDynamicThreadExecutesAsyncs()137     public void testDynamicThreadExecutesAsyncs() throws InterruptedException {
138         ThreadId threadId = ThreadId.createFresh();
139         JobQueue async_jobQueue = new JobQueue(__javaThreadPoolFactory,
140                                                threadId);
141         TestWorkAt workAt = new TestWorkAt();
142         testAsyncJobQueue(workAt, async_jobQueue, threadId);
143         assure("", workAt._async_counter == TestWorkAt.MESSAGES);
144         assure("", workAt._sync_counter == TestWorkAt.MESSAGES);
145     }
146 
testExecuteJobs(JobQueue jobQueue)147     private void testExecuteJobs(JobQueue jobQueue) throws InterruptedException
148     {
149         TestWorkAt workAt = new TestWorkAt();
150         testSendRequests(workAt, "increment", jobQueue);
151         synchronized (workAt) {
152             jobQueue.putJob(new Job(workAt, __iReceiver,
153                                     new Message(
154                                         null, true, "oid", __workAt_td,
155                                         ((MethodDescription)
156                                          __workAt_td.getMethodDescription(
157                                              "notifyme")),
158                                         true, null, false, null, null)),
159                             null);
160             while (!workAt._notified) {
161                 workAt.wait();
162             }
163         }
164         assure("", workAt._counter == TestWorkAt.MESSAGES);
165     }
166 
testAsyncJobQueue(TestWorkAt workAt, JobQueue async_jobQueue, ThreadId threadId)167     private void testAsyncJobQueue(TestWorkAt workAt, JobQueue async_jobQueue,
168                                    ThreadId threadId)
169         throws InterruptedException
170     {
171         // put slow async calls first, followed by fast sync calls:
172         testSendRequests(workAt, "asyncCall", async_jobQueue);
173         testSendRequests(workAt, "syncCall",
174                          __javaThreadPoolFactory.getJobQueue(threadId));
175         synchronized (workAt) {
176             async_jobQueue._sync_jobQueue.putJob(
177                 new Job(workAt, __iReceiver,
178                         new Message(
179                             null, true, "oid", __workAt_td,
180                             ((MethodDescription)
181                              __workAt_td.getMethodDescription("notifyme")),
182                             true, null, false, null, null)),
183                 null);
184             while (!workAt._notified) {
185                 workAt.wait();
186             }
187         }
188         assure("", workAt.passedAsyncTest());
189     }
190 
testSendRequests(TestWorkAt workAt, String operation, JobQueue jobQueue)191     private void testSendRequests(TestWorkAt workAt, String operation,
192                                   JobQueue jobQueue) {
193         Message iMessage = new Message(
194             null, true, "oid", __workAt_td,
195             (MethodDescription) __workAt_td.getMethodDescription(operation),
196             true, null, false, null, null);
197         for (int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
198             Thread.yield(); // force scheduling
199             jobQueue.putJob(new Job(workAt, __iReceiver, iMessage),
200                             new Object());
201         }
202     }
203 
204     private static final class TestThread extends Thread {
205         public final ThreadId _threadId = JavaThreadPoolFactory.getThreadId();
206         public final Object _disposeId = new Object();
207         public JobQueue _jobQueue = null;
208         public String _message;
209 
TestThread(int waitTime)210         public TestThread(int waitTime) {
211             this.waitTime = waitTime;
212             _jobQueue = new JobQueue(__javaThreadPoolFactory, _threadId, false);
213         }
214 
TestThread()215         public TestThread() {
216             waitTime = 0;
217         }
218 
run()219         public void run() {
220             synchronized (lock) {
221                 state = STATE_STARTED;
222                 lock.notifyAll();
223             }
224             try {
225                 if (waitTime != 0) {
226                     Thread.sleep(waitTime);
227                 }
228                 _jobQueue.enter(_disposeId);
229             } catch (Throwable e) {
230                 _message = e.getMessage();
231             }
232             synchronized (lock) {
233                 state = STATE_DONE;
234                 lock.notifyAll();
235             }
236         }
237 
waitToStart()238         public void waitToStart() throws InterruptedException {
239             start();
240             synchronized (lock) {
241                 while (state == STATE_INITIAL) {
242                     lock.wait();
243                 }
244             }
245         }
246 
waitToTerminate()247         public void waitToTerminate() throws InterruptedException {
248             synchronized (lock) {
249                 while (state != STATE_DONE) {
250                     lock.wait();
251                 }
252             }
253             join();
254         }
255 
256         private final int waitTime;
257 
258         private final Object lock = new Object();
259         private int state = STATE_INITIAL;
260         private static final int STATE_INITIAL = 0;
261         private static final int STATE_STARTED = 1;
262         private static final int STATE_DONE = 2;
263     }
264 
265     private static final JavaThreadPoolFactory __javaThreadPoolFactory
266     = new JavaThreadPoolFactory();
267     private static final IReceiver __iReceiver = new TestReceiver();
268     private static final TypeDescription __workAt_td
269     = TypeDescription.getTypeDescription(TestIWorkAt.class);
270 }
271