1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 package com.sun.star.lib.uno.environments.remote;
25 
26 import com.sun.star.lib.uno.typedesc.MethodDescription;
27 import com.sun.star.lib.uno.typedesc.TypeDescription;
28 
29 import org.junit.Test;
30 import static org.junit.Assert.*;
31 
32 public final class JobQueue_Test {
33     @Test
testThreadLeavesJobQueueOnDispose0()34     public void testThreadLeavesJobQueueOnDispose0() throws InterruptedException
35     {
36         testThreadLeavesJobQueueOnDispose(0);
37     }
38 
39     @Test
testThreadLeavesJobQueueOnDispose5000()40     public void testThreadLeavesJobQueueOnDispose5000()
41         throws InterruptedException
42     {
43         testThreadLeavesJobQueueOnDispose(5000);
44     }
45 
testThreadLeavesJobQueueOnDispose(int waitTime)46     private void testThreadLeavesJobQueueOnDispose(int waitTime)
47         throws InterruptedException
48     {
49         TestThread t = new TestThread(waitTime);
50         t.waitToStart();
51         String msg = "xcxxxxxxxx";
52         t._jobQueue.dispose(t._disposeId, new RuntimeException (msg));
53         t.waitToTerminate();
54         assertTrue("", t._message.equals(msg));
55     }
56 
57     @Test
testThreadLeavesJobQueueOnReply0()58     public void testThreadLeavesJobQueueOnReply0() throws InterruptedException {
59         testThreadLeavesJobQueueOnReply(0);
60     }
61 
62     @Test
testThreadLeavesJobQueueOnReply5000()63     public void testThreadLeavesJobQueueOnReply5000()
64         throws InterruptedException
65     {
66         testThreadLeavesJobQueueOnReply(5000);
67     }
68 
testThreadLeavesJobQueueOnReply(int waitTime)69     private void testThreadLeavesJobQueueOnReply(int waitTime)
70         throws InterruptedException
71     {
72         TestThread t = new TestThread(waitTime);
73         t.waitToStart();
74         // put reply job:
75         t._jobQueue.putJob(
76             new Job(null, __iReceiver,
77                     new Message(
78                         null, false, "oid", __workAt_td, null, false, null,
79                         false, null, null)),
80             null);
81         t.waitToTerminate();
82         assertTrue("", true); // TODO! ???
83     }
84 
85     @Test
testStaticThreadExecutesJobs0()86     public void testStaticThreadExecutesJobs0() throws InterruptedException {
87         testStaticThreadExecutesJobs(0);
88     }
89 
90     @Test
testStaticThreadExecutesJobs5000()91     public void testStaticThreadExecutesJobs5000() throws InterruptedException {
92         testStaticThreadExecutesJobs(5000);
93     }
94 
testStaticThreadExecutesJobs(int waitTime)95     private void testStaticThreadExecutesJobs(int waitTime)
96         throws InterruptedException
97     {
98         TestThread t = new TestThread(waitTime);
99         t.waitToStart();
100         testExecuteJobs(t._jobQueue);
101         t._jobQueue.dispose(t._disposeId,
102                             new RuntimeException("xxxxxxxxxxxxx"));
103         t.waitToTerminate();
104     }
105 
106     @Test
testDynamicThreadExecutesJob()107     public void testDynamicThreadExecutesJob() throws InterruptedException {
108         testExecuteJobs(
109             new JobQueue(
110                 __javaThreadPoolFactory, ThreadId.createFresh(), true));
111     }
112 
113     @Test
testStaticThreadExecutesAsyncs()114     public void testStaticThreadExecutesAsyncs() throws InterruptedException {
115         TestThread t = new TestThread();
116         JobQueue async_jobQueue = new JobQueue(__javaThreadPoolFactory,
117                                                t._threadId);
118         assertTrue("", async_jobQueue._ref_count == 1);
119         t._jobQueue = __javaThreadPoolFactory.getJobQueue(t._threadId);
120         assertTrue("", t._jobQueue._ref_count == 1);
121         t.waitToStart();
122         TestWorkAt workAt = new TestWorkAt();
123         testAsyncJobQueue(workAt, async_jobQueue, t._threadId);
124         t._jobQueue.dispose(t._disposeId,
125                             new RuntimeException("xxxxxxxxxxxxx"));
126         t.waitToTerminate();
127         assertTrue("", workAt._async_counter == TestWorkAt.MESSAGES);
128         assertTrue("", workAt._sync_counter == TestWorkAt.MESSAGES);
129     }
130 
131     @Test
testDynamicThreadExecutesAsyncs()132     public void testDynamicThreadExecutesAsyncs() throws InterruptedException {
133         ThreadId threadId = ThreadId.createFresh();
134         JobQueue async_jobQueue = new JobQueue(__javaThreadPoolFactory,
135                                                threadId);
136         TestWorkAt workAt = new TestWorkAt();
137         testAsyncJobQueue(workAt, async_jobQueue, threadId);
138         assertTrue("", workAt._async_counter == TestWorkAt.MESSAGES);
139         assertTrue("", workAt._sync_counter == TestWorkAt.MESSAGES);
140     }
141 
testExecuteJobs(JobQueue jobQueue)142     private void testExecuteJobs(JobQueue jobQueue) throws InterruptedException
143     {
144         TestWorkAt workAt = new TestWorkAt();
145         testSendRequests(workAt, "increment", jobQueue);
146         synchronized (workAt) {
147             jobQueue.putJob(new Job(workAt, __iReceiver,
148                                     new Message(
149                                         null, true, "oid", __workAt_td,
150                                         ((MethodDescription)
151                                          __workAt_td.getMethodDescription(
152                                              "notifyme")),
153                                         true, null, false, null, null)),
154                             null);
155             while (!workAt._notified) {
156                 workAt.wait();
157             }
158         }
159         assertTrue("", workAt._counter == TestWorkAt.MESSAGES);
160     }
161 
testAsyncJobQueue(TestWorkAt workAt, JobQueue async_jobQueue, ThreadId threadId)162     private void testAsyncJobQueue(TestWorkAt workAt, JobQueue async_jobQueue,
163                                    ThreadId threadId)
164         throws InterruptedException
165     {
166         // put slow async calls first, followed by fast sync calls:
167         testSendRequests(workAt, "asyncCall", async_jobQueue);
168         testSendRequests(workAt, "syncCall",
169                          __javaThreadPoolFactory.getJobQueue(threadId));
170         synchronized (workAt) {
171             async_jobQueue._sync_jobQueue.putJob(
172                 new Job(workAt, __iReceiver,
173                         new Message(
174                             null, true, "oid", __workAt_td,
175                             ((MethodDescription)
176                              __workAt_td.getMethodDescription("notifyme")),
177                             true, null, false, null, null)),
178                 null);
179             while (!workAt._notified) {
180                 workAt.wait();
181             }
182         }
183         assertTrue("", workAt.passedAsyncTest());
184     }
185 
testSendRequests(TestWorkAt workAt, String operation, JobQueue jobQueue)186     private void testSendRequests(TestWorkAt workAt, String operation,
187                                   JobQueue jobQueue) {
188         Message iMessage = new Message(
189             null, true, "oid", __workAt_td,
190             (MethodDescription) __workAt_td.getMethodDescription(operation),
191             true, null, false, null, null);
192         for (int i = 0; i < TestWorkAt.MESSAGES; ++ i) {
193             Thread.yield(); // force scheduling
194             jobQueue.putJob(new Job(workAt, __iReceiver, iMessage),
195                             new Object());
196         }
197     }
198 
199     private static final class TestThread extends Thread {
200         public final ThreadId _threadId = JavaThreadPoolFactory.getThreadId();
201         public final Object _disposeId = new Object();
202         public JobQueue _jobQueue = null;
203         public String _message;
204 
TestThread(int waitTime)205         public TestThread(int waitTime) {
206             this.waitTime = waitTime;
207             _jobQueue = new JobQueue(__javaThreadPoolFactory, _threadId, false);
208         }
209 
TestThread()210         public TestThread() {
211             waitTime = 0;
212         }
213 
run()214         public void run() {
215             synchronized (lock) {
216                 state = STATE_STARTED;
217                 lock.notifyAll();
218             }
219             try {
220                 if (waitTime != 0) {
221                     Thread.sleep(waitTime);
222                 }
223                 _jobQueue.enter(_disposeId);
224             } catch (Throwable e) {
225                 _message = e.getMessage();
226             }
227             synchronized (lock) {
228                 state = STATE_DONE;
229                 lock.notifyAll();
230             }
231         }
232 
waitToStart()233         public void waitToStart() throws InterruptedException {
234             start();
235             synchronized (lock) {
236                 while (state == STATE_INITIAL) {
237                     lock.wait();
238                 }
239             }
240         }
241 
waitToTerminate()242         public void waitToTerminate() throws InterruptedException {
243             synchronized (lock) {
244                 while (state != STATE_DONE) {
245                     lock.wait();
246                 }
247             }
248             join();
249         }
250 
251         private final int waitTime;
252 
253         private final Object lock = new Object();
254         private int state = STATE_INITIAL;
255         private static final int STATE_INITIAL = 0;
256         private static final int STATE_STARTED = 1;
257         private static final int STATE_DONE = 2;
258     }
259 
260     private static final JavaThreadPoolFactory __javaThreadPoolFactory
261     = new JavaThreadPoolFactory();
262     private static final IReceiver __iReceiver = new TestReceiver();
263     private static final TypeDescription __workAt_td
264     = TypeDescription.getTypeDescription(TestIWorkAt.class);
265 }
266