1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 package com.sun.star.lib.uno.environments.remote; 25 26 import com.sun.star.lang.DisposedException; 27 28 /** 29 * The <code>JobQueue</code> implements a queue for jobs. 30 * For every jobs thread id exists a job queue which is registered 31 * at the <code>ThreadPool</code>. 32 * A JobQueue is splitted in a sync job queue and an async job queue. 33 * The sync job queue is the registerd queue, it delegates async jobs 34 * (put by <code>putjob</code>) into the async queue, which is only 35 * known by the sync queue. 36 * <p> 37 * @version $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $ 38 * @author Kay Ramme 39 * @see com.sun.star.lib.uno.environments.remote.ThreadPool 40 * @see com.sun.star.lib.uno.environments.remote.Job 41 * @see com.sun.star.lib.uno.environments.remote.ThreadId 42 * @since UDK1.0 43 */ 44 public class JobQueue { 45 /** 46 * When set to true, enables various debugging output. 47 */ 48 private static final boolean DEBUG = false; 49 50 protected Job _head; // the head of the job list 51 protected Job _tail; // the tail of the job list 52 53 protected ThreadId _threadId; // the thread id of the queue 54 protected int _ref_count = 0; // the stack deepness 55 protected boolean _createThread; // create a worker thread, if needed 56 protected boolean _createThread_now; // create a worker thread, if needed 57 protected Thread _worker_thread; // the thread that does the jobs 58 59 protected Object _disposeId; // the active dispose id 60 protected Object _doDispose = null; 61 protected Throwable _throwable; 62 63 protected JobQueue _async_jobQueue; // chaining job qeueus for asyncs 64 protected JobQueue _sync_jobQueue; // chaining job qeueus for syncs 65 66 protected boolean _active = false; 67 68 protected JavaThreadPoolFactory _javaThreadPoolFactory; 69 70 /** 71 * A thread for dispatching jobs 72 */ 73 class JobDispatcher extends Thread { 74 Object _disposeId; 75 JobDispatcher(Object disposeId)76 JobDispatcher(Object disposeId) { 77 if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId); 78 79 _disposeId = disposeId; 80 } 81 getThreadId()82 ThreadId getThreadId() { 83 return _threadId; 84 } 85 run()86 public void run() { 87 if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread()); 88 89 try { 90 enter(2000, _disposeId); 91 } 92 catch(Throwable throwable) { 93 if(_head != null || _active) { // there was a job in progress, so give a stack 94 System.err.println(getClass().getName() + " - exception occurred:" + throwable); 95 throwable.printStackTrace(System.err); 96 } 97 } 98 finally { 99 release(); 100 } 101 102 if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId); 103 104 // try { 105 // Object object = new Object(); 106 // synchronized(object) { 107 // object.wait(); 108 // } 109 // } 110 // catch(InterruptedException interruptedException) { 111 // } 112 } 113 } 114 115 116 /** 117 * Constructs a async job queue with the given thread id 118 * which belongs to the given sync job queue. 119 * <p> 120 * @param threadId the thread id 121 * @param sync_jobQueue the sync queue this async queue belongs to 122 * @see com.sun.star.lib.uno.environments.remote.ThreadID 123 */ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId)124 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) { 125 _javaThreadPoolFactory = javaThreadPoolFactory; 126 _threadId = ThreadId.createFresh(); 127 128 _sync_jobQueue = javaThreadPoolFactory.getJobQueue(threadId); 129 if(_sync_jobQueue == null) { 130 _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true); 131 _sync_jobQueue.acquire(); 132 } 133 134 _sync_jobQueue._async_jobQueue = this; 135 136 _createThread = true; 137 _createThread_now = true; 138 139 acquire(); 140 141 if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId); 142 } 143 144 /** 145 * Constructs a sync job queue with the given thread id and the given thread. 146 * <p> 147 * @param threadId the thread id 148 * @param createThread if true, the queue creates a worker thread if needed 149 * @see com.sun.star.lib.uno.environments.remote.ThreadID 150 */ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread)151 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){ 152 _javaThreadPoolFactory = javaThreadPoolFactory; 153 _threadId = threadId; 154 _createThread = createThread; 155 _createThread_now = createThread; 156 157 if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId + " " + createThread); 158 } 159 160 /** 161 * Gives the thread id of this queue 162 * <p> 163 * @return the thread id 164 * @see com.sun.star.lib.uno.environments.remote.ThreadID 165 */ getThreadId()166 ThreadId getThreadId() { 167 return _threadId; 168 } 169 acquire()170 synchronized void acquire() { 171 // add only synchronous queues . 172 if(_ref_count <= 0 && _sync_jobQueue == null ) 173 _javaThreadPoolFactory.addJobQueue(this); 174 175 ++ _ref_count; 176 } 177 release()178 synchronized void release() { 179 -- _ref_count; 180 181 if(_ref_count <= 0) { 182 // only synchronous queues needs to be removed . 183 if( _sync_jobQueue == null ) 184 _javaThreadPoolFactory.removeJobQueue(this); 185 186 187 if(_sync_jobQueue != null) { 188 _sync_jobQueue._async_jobQueue = null; 189 _sync_jobQueue.release(); 190 } 191 } 192 } 193 194 /** 195 * Removes a job from the queue. 196 * <p> 197 * @return a job or null if timed out 198 * @param waitTime the maximum amount of time to wait for a job 199 */ removeJob(int waitTime)200 private Job removeJob(int waitTime) { 201 if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId); 202 203 Job job = null; 204 synchronized (this) { 205 // wait max. waitTime time for a job to enter the queue 206 boolean waited = false; 207 while(_head == null && (waitTime == 0 || !waited)) { 208 if(_doDispose == _disposeId) { 209 _doDispose = null; 210 throw (DisposedException) 211 new DisposedException().initCause(_throwable); 212 } 213 214 // notify sync queues 215 notifyAll(); 216 217 try { 218 // wait for new job 219 wait(waitTime); 220 } 221 catch(InterruptedException interruptedException) { 222 throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 223 } 224 225 // signal that we have already waited once 226 waited = true; 227 } 228 229 230 if(_head != null) { 231 Job current = _head; 232 _head = _head._next; 233 234 if(_head == null) 235 _tail = null; 236 237 job = current; 238 _active = true; 239 } 240 } 241 242 // always wait for asynchron jobqueue to be finished ! 243 if(job != null && _async_jobQueue != null) { 244 synchronized(_async_jobQueue) { 245 // wait for async queue to be empty and last job to be done 246 while(_async_jobQueue._active || _async_jobQueue._head != null) { 247 if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " + _async_jobQueue._worker_thread); 248 249 if(_doDispose == _disposeId) { 250 _doDispose = null; 251 throw (DisposedException) 252 new DisposedException().initCause(_throwable); 253 } 254 255 try { 256 _async_jobQueue.wait(); 257 } 258 catch(InterruptedException interruptedException) { 259 throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 260 } 261 } 262 } 263 } 264 265 return job; 266 } 267 268 /** 269 * Puts a job into the queue. 270 * <p> 271 * @param job the job 272 * @param disposeId a dispose id 273 */ putJob(Job job, Object disposeId)274 synchronized void putJob(Job job, Object disposeId) { 275 if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job); 276 277 if(_tail != null) 278 _tail._next = job; 279 else 280 _head = job; 281 282 _tail = job; 283 284 if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one 285 286 acquire(); 287 288 _createThread_now = false; 289 new JobDispatcher(disposeId).start(); 290 } 291 292 // always notify possible waiters 293 notifyAll(); 294 } 295 296 /** 297 * Enters the job queue. 298 * <p> 299 * @return the result of the final job (reply) 300 * @param disposeId a dispose id 301 */ enter(Object disposeId)302 Object enter(Object disposeId) throws Throwable { 303 return enter(0, disposeId); // wait infinitly 304 } 305 306 /** 307 * Enters the job queue. 308 * <p> 309 * @return the result of the final job (reply) 310 * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly) 311 * @param disposeId a dispose id 312 */ enter(int waitTime, Object disposeId)313 Object enter(int waitTime, Object disposeId) throws Throwable { 314 if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId); 315 316 boolean quit = false; 317 318 Object hold_disposeId = _disposeId; 319 _disposeId = disposeId; 320 321 Object result = null; 322 323 Thread hold_worker_thread = _worker_thread; 324 _worker_thread = Thread.currentThread(); 325 326 while(!quit) { 327 Job job = null; 328 329 try { 330 job = removeJob(waitTime); 331 332 if(job != null) { 333 try { 334 result = job.execute(); 335 } 336 finally { 337 _active = false; 338 } 339 340 if (!job.isRequest()) { 341 job.dispose(); 342 343 quit = true; 344 } 345 346 job = null; 347 } 348 else 349 quit = true; 350 351 352 } 353 finally { // ensure that this queue becomes disposed, if necessary 354 if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result); 355 356 synchronized(this) { 357 if(job != null || (quit && _head == null)) { 358 _worker_thread = hold_worker_thread; 359 360 _createThread_now = true; 361 362 _disposeId = hold_disposeId; 363 364 if(_sync_jobQueue != null) 365 notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting) 366 } 367 else 368 quit = false; 369 370 } 371 } 372 } 373 374 return result; 375 } 376 377 /** 378 * If the given disposeId is registered, 379 * interrups the worker thread. 380 * <p> 381 * @param disposeId the dispose id 382 */ dispose(Object disposeId, Throwable throwable)383 synchronized void dispose(Object disposeId, Throwable throwable) { 384 if(_sync_jobQueue == null) { // dispose only sync queues 385 _doDispose = disposeId; 386 _throwable = throwable; 387 388 // get thread out of wait and let it throw the throwable 389 if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread"); 390 391 notifyAll(); 392 } 393 } 394 } 395 396