1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 package com.sun.star.lib.uno.environments.remote; 25 26 import com.sun.star.lib.uno.typedesc.MethodDescription; 27 import com.sun.star.lib.uno.typedesc.TypeDescription; 28 29 import org.junit.Test; 30 import static org.junit.Assert.*; 31 32 public class ThreadPool_Test { 33 @Test testDispose()34 public void testDispose() throws InterruptedException { 35 IThreadPool iThreadPool = ThreadPoolManager.create(); 36 TestThread testThread = new TestThread(iThreadPool); 37 38 ThreadId threadId = null; 39 40 // start the test thread 41 synchronized(testThread) { 42 testThread.start(); 43 44 testThread.wait(); 45 46 threadId = testThread._threadId; 47 48 // let the thread attach and enter the threadpool 49 testThread.notifyAll(); 50 } 51 52 String message = "blabla"; 53 54 // terminate the test thread 55 synchronized(testThread) { 56 // put reply job 57 iThreadPool.dispose(new RuntimeException(message)); 58 59 testThread.wait(); 60 } 61 62 testThread.join(); 63 64 assertTrue("", testThread._message.equals(message)); 65 } 66 67 @Test testThreadAsync()68 public void testThreadAsync() throws InterruptedException { 69 TestWorkAt workAt = new TestWorkAt(); 70 71 ThreadId threadId = ThreadId.createFresh(); 72 73 // queue asyncs 74 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 75 Thread.yield(); // force scheduling 76 putJob(workAt, false, threadId, "increment"); 77 } 78 79 synchronized(workAt) { 80 putJob(workAt, false, threadId, "notifyme"); 81 82 while(!workAt._notified) 83 workAt.wait(); 84 } 85 86 assertTrue("", workAt._counter == TestWorkAt.MESSAGES); 87 } 88 89 @Test testDynamicThreadSync()90 public void testDynamicThreadSync() throws InterruptedException { 91 TestWorkAt workAt = new TestWorkAt(); 92 93 ThreadId threadId = ThreadId.createFresh(); 94 95 // queue asyncs 96 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 97 Thread.yield(); // force scheduling 98 putJob(workAt, true, threadId, "increment"); 99 } 100 101 synchronized(workAt) { 102 putJob(workAt, true, threadId, "notifyme"); 103 104 while(!workAt._notified) 105 workAt.wait(); 106 } 107 108 assertTrue("", workAt._counter == TestWorkAt.MESSAGES); 109 } 110 111 @Test testStaticThreadSync()112 public void testStaticThreadSync() throws InterruptedException { 113 TestWorkAt workAt = new TestWorkAt(); 114 115 TestThread testThread = new TestThread(); 116 117 ThreadId threadId = null; 118 119 // start the test thread 120 synchronized(testThread) { 121 testThread.start(); 122 123 testThread.wait(); 124 125 threadId = testThread._threadId; 126 127 // let the thread attach and enter the threadpool 128 testThread.notifyAll(); 129 } 130 131 // queue syncs 132 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 133 Thread.yield(); // force scheduling 134 putJob(workAt, true, threadId, "increment"); 135 } 136 137 // terminate the test thread 138 synchronized(testThread) { 139 // put reply job 140 putJob(workAt, true, threadId, null); 141 142 testThread.wait(); 143 } 144 145 testThread.join(); 146 147 assertTrue("", workAt._counter == TestWorkAt.MESSAGES); 148 } 149 150 @Test testDynamicThreadAsyncSyncOrder()151 public void testDynamicThreadAsyncSyncOrder() throws InterruptedException { 152 TestWorkAt workAt = new TestWorkAt(); 153 154 ThreadId threadId = ThreadId.createFresh(); 155 156 // queue asyncs 157 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 158 Thread.yield(); // force scheduling 159 putJob(workAt, false, threadId, "asyncCall"); 160 } 161 162 // queue syncs 163 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 164 Thread.yield(); // force scheduling 165 putJob(workAt, true, threadId, "syncCall"); 166 } 167 168 synchronized(workAt) { 169 putJob(workAt, true, threadId, "notifyme"); 170 171 while(!workAt._notified) 172 workAt.wait(); 173 } 174 175 assertTrue("", workAt.passedAsyncTest()); 176 } 177 178 @Test testStaticThreadAsyncSyncOrder()179 public void testStaticThreadAsyncSyncOrder() throws InterruptedException { 180 TestWorkAt workAt = new TestWorkAt(); 181 182 TestThread testThread = new TestThread(); 183 184 // start the test thread 185 synchronized(testThread) { 186 testThread.start(); 187 188 testThread.wait(); 189 } 190 191 ThreadId threadId = testThread._threadId; 192 193 // queue asyncs 194 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 195 Thread.yield(); // force scheduling 196 putJob(workAt, false, threadId, "asyncCall"); 197 } 198 199 // let the thread attach and enter the threadpool 200 synchronized(testThread) { 201 testThread.notifyAll(); 202 } 203 204 // queue syncs 205 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 206 Thread.yield(); // force scheduling 207 putJob(workAt, true, threadId, "syncCall"); 208 } 209 210 // terminate the test thread 211 synchronized(testThread) { 212 // put reply job 213 putJob(workAt, true, threadId, null); 214 215 testThread.wait(); 216 } 217 218 testThread.join(); 219 220 assertTrue("", workAt.passedAsyncTest()); 221 } 222 223 @Test testStress()224 public void testStress() throws InterruptedException { 225 TestWorkAt workAt = new TestWorkAt(); 226 for (int i = 0; i < TestWorkAt.MESSAGES; ++i) { 227 Thread.yield(); // force scheduling 228 ThreadId threadID = ThreadId.createFresh(); 229 putJob(workAt, true, threadID, "increment"); 230 putJob(workAt, false, threadID, "increment"); 231 } 232 synchronized (workAt) { 233 while (workAt._counter < 2 * TestWorkAt.MESSAGES) { 234 workAt.wait(); 235 } 236 } 237 238 abstract class Stress extends Thread { 239 public Stress(int count) { 240 this.count = count; 241 } 242 243 public void run() { 244 try { 245 for (int i = 0; i < count; ++i) { 246 runTest(); 247 } 248 } catch (Throwable e) { 249 e.printStackTrace(System.err); 250 } 251 } 252 253 protected abstract void runTest() throws InterruptedException; 254 255 private final int count; 256 } 257 258 Stress stress1 = new Stress(50) { 259 protected void runTest() throws InterruptedException { 260 testThreadAsync(); 261 } 262 }; 263 stress1.start(); 264 265 Stress stress2 = new Stress(50) { 266 protected void runTest() throws InterruptedException { 267 testDynamicThreadSync(); 268 } 269 }; 270 stress2.start(); 271 272 Stress stress3 = new Stress(50) { 273 protected void runTest() throws InterruptedException { 274 testStaticThreadSync(); 275 } 276 }; 277 stress3.start(); 278 279 Stress stress4 = new Stress(50) { 280 protected void runTest() throws InterruptedException { 281 testDynamicThreadAsyncSyncOrder(); 282 } 283 }; 284 stress4.start(); 285 286 Stress stress5 = new Stress(50) { 287 protected void runTest() throws InterruptedException { 288 testStaticThreadAsyncSyncOrder(); 289 } 290 }; 291 stress5.start(); 292 293 Stress stress6 = new Stress(500) { 294 protected void runTest() throws InterruptedException { 295 testDispose(); 296 } 297 }; 298 stress6.start(); 299 300 stress1.join(); 301 stress2.join(); 302 stress3.join(); 303 stress4.join(); 304 stress5.join(); 305 stress6.join(); 306 } 307 308 @Test testAsyncSync()309 public void testAsyncSync() throws InterruptedException { 310 TestWorkAt workAt = new TestWorkAt(); 311 ThreadId threadId = ThreadId.createFresh(); 312 MyWorkAt myWorkAt = new MyWorkAt( workAt ); 313 314 // queue asyncs 315 for(int i = 0; i < TestWorkAt.MESSAGES; ++ i) { 316 if( i == 2 ) 317 { 318 putJob( myWorkAt, false , threadId, "asyncCall" ); 319 } 320 putJob(workAt, false, threadId, "asyncCall"); 321 } 322 323 synchronized(workAt) { 324 putJob(workAt, false, threadId, "notifyme"); 325 326 while(!workAt._notified) 327 workAt.wait(); 328 } 329 330 assertTrue("", 331 workAt._async_counter == TestWorkAt.MESSAGES 332 && myWorkAt._success); 333 } 334 putJob(TestIWorkAt iWorkAt, boolean synchron, ThreadId threadId, String operation)335 private static void putJob(TestIWorkAt iWorkAt, boolean synchron, 336 ThreadId threadId, String operation) { 337 __iThreadPool.putJob( 338 new Job(iWorkAt, __iReceiver, 339 new Message( 340 threadId, operation != null, "oid", __workAt_td, 341 (operation == null 342 ? null 343 : ((MethodDescription) 344 __workAt_td.getMethodDescription(operation))), 345 synchron, null, false, null, null))); 346 } 347 348 private static final class TestThread extends Thread { 349 ThreadId _threadId; 350 Object _disposeId = new Object(); 351 String _message; 352 IThreadPool _iThreadPool; 353 TestThread()354 TestThread() { 355 this(__iThreadPool); 356 } 357 TestThread(IThreadPool iThreadPool)358 TestThread(IThreadPool iThreadPool) { 359 _iThreadPool = iThreadPool; 360 } 361 run()362 public void run() { 363 _threadId = _iThreadPool.getThreadId(); 364 365 366 try { 367 synchronized(this) { 368 // notify that we are running 369 notify(); 370 371 _iThreadPool.attach(); 372 373 // wait until we should continue 374 wait(); 375 } 376 377 _iThreadPool.enter(); 378 } 379 catch(Throwable throwable) { 380 _message = throwable.getMessage(); 381 } 382 383 _iThreadPool.detach(); 384 385 synchronized(this) { 386 // notify the listeners that we are dying 387 notifyAll(); 388 } 389 } 390 } 391 392 private static final class MyWorkAt implements TestIWorkAt { MyWorkAt( TestWorkAt async_WorkAt )393 public MyWorkAt( TestWorkAt async_WorkAt ) { 394 _async_WorkAt = async_WorkAt; 395 } 396 syncCall()397 public void syncCall() throws Throwable 398 { 399 Message iMessage = new Message( 400 __iThreadPool.getThreadId(), false, "oid", __workAt_td, null, 401 false, null, false, null, null); 402 403 // marshal reply 404 ThreadPool_Test.__iThreadPool.putJob( 405 new Job(this, ThreadPool_Test. __iReceiver, iMessage)); 406 } 407 asyncCall()408 public void asyncCall() throws Throwable { 409 for (int i = 0 ; i < 5 ; ++i) { 410 ThreadPool_Test.__iThreadPool.attach(); 411 ThreadPool_Test.putJob(this, true, __iThreadPool.getThreadId(), 412 "syncCall"); 413 // wait for reply 414 ThreadPool_Test.__iThreadPool.enter(); 415 ThreadPool_Test.__iThreadPool.detach(); 416 } 417 // async must have waited for this call 418 _success = _async_WorkAt._async_counter == 2; 419 } 420 increment()421 public void increment() throws Throwable {} 422 notifyme()423 public void notifyme() {} 424 425 public boolean _success = false; 426 427 private final TestWorkAt _async_WorkAt; 428 } 429 430 private static final IThreadPool __iThreadPool = ThreadPoolManager.create(); 431 private static final IReceiver __iReceiver = new TestReceiver(); 432 private static final TypeDescription __workAt_td 433 = TypeDescription.getTypeDescription(TestIWorkAt.class); 434 } 435