1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 package com.sun.star.lib.uno.environments.remote;
25 
26 import com.sun.star.lang.DisposedException;
27 
28 /**
29  * The <code>JobQueue</code> implements a queue for jobs.
30  * For every jobs thread id exists a job queue which is registered
31  * at the <code>ThreadPool</code>.
32  * A JobQueue is splitted in a sync job queue and an async job queue.
33  * The sync job queue is the registerd queue, it delegates async jobs
34  * (put by <code>putjob</code>) into the async queue, which is only
35  * known by the sync queue.
36  * <p>
37  * @version 	$Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $
38  * @author 	    Kay Ramme
39  * @see         com.sun.star.lib.uno.environments.remote.ThreadPool
40  * @see         com.sun.star.lib.uno.environments.remote.Job
41  * @see         com.sun.star.lib.uno.environments.remote.ThreadId
42  * @since       UDK1.0
43  */
44 public class JobQueue {
45 	/**
46 	 * When set to true, enables various debugging output.
47 	 */
48 	private static final boolean DEBUG = false;
49 
50 	protected Job _head;                 // the head of the job list
51 	protected Job _tail;                 // the tail of the job list
52 
53 	protected ThreadId  _threadId;       // the thread id of the queue
54 	protected int       _ref_count = 0;  // the stack deepness
55 	protected boolean   _createThread;   // create a worker thread, if needed
56 	protected boolean   _createThread_now;   // create a worker thread, if needed
57 	protected Thread    _worker_thread;  // the thread that does the jobs
58 
59 	protected Object    _disposeId; // the active dispose id
60 	protected Object    _doDispose = null;
61 	protected Throwable _throwable;
62 
63 	protected JobQueue  _async_jobQueue; // chaining job qeueus for asyncs
64 	protected JobQueue  _sync_jobQueue;  // chaining job qeueus for syncs
65 
66 	protected boolean _active = false;
67 
68 	protected JavaThreadPoolFactory _javaThreadPoolFactory;
69 
70 	/**
71 	 * A thread for dispatching jobs
72 	 */
73 	class JobDispatcher extends Thread {
74 		Object _disposeId;
75 
JobDispatcher(Object disposeId)76 		JobDispatcher(Object disposeId) {
77 			if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId);
78 
79 			_disposeId = disposeId;
80 		}
81 
getThreadId()82 		ThreadId getThreadId() {
83 			return _threadId;
84 		}
85 
run()86 		public void run() {
87 			if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread());
88 
89 			try {
90   				enter(2000, _disposeId);
91 			}
92 			catch(Throwable throwable) {
93 				if(_head != null || _active) { // there was a job in progress, so give a stack
94 					System.err.println(getClass().getName() + " - exception occurred:" + throwable);
95 					throwable.printStackTrace(System.err);
96 				}
97 			}
98 			finally {
99 				release();
100 			}
101 
102 			if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId);
103 
104 //  			try {
105 //  				Object object = new Object();
106 //  				synchronized(object) {
107 //  					object.wait();
108 //  				}
109 //  			}
110 //  			catch(InterruptedException interruptedException) {
111 //  			}
112 		}
113 	}
114 
115 
116 	/**
117 	 * Constructs a async job queue with the given thread id
118 	 * which belongs to the given sync job queue.
119 	 * <p>
120 	 * @param threadId         the thread id
121 	 * @param sync_jobQueue    the sync queue this async queue belongs to
122 	 * @see                    com.sun.star.lib.uno.environments.remote.ThreadID
123 	 */
JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId)124 	JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) {
125 		_javaThreadPoolFactory = javaThreadPoolFactory;
126         _threadId = ThreadId.createFresh();
127 
128 		_sync_jobQueue    = javaThreadPoolFactory.getJobQueue(threadId);
129 		if(_sync_jobQueue == null) {
130 			_sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true);
131 			_sync_jobQueue.acquire();
132 		}
133 
134 		_sync_jobQueue._async_jobQueue = this;
135 
136 		_createThread     = true;
137 		_createThread_now = true;
138 
139 		acquire();
140 
141 		if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId);
142 	}
143 
144 	/**
145 	 * Constructs a sync job queue with the given thread id and the given thread.
146 	 * <p>
147 	 * @param threadId        the thread id
148 	 * @param createThread    if true, the queue creates a worker thread if needed
149 	 * @see             com.sun.star.lib.uno.environments.remote.ThreadID
150 	 */
JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread)151 	JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){
152 		_javaThreadPoolFactory   = javaThreadPoolFactory;
153 		_threadId         = threadId;
154 		_createThread     = createThread;
155 		_createThread_now = createThread;
156 
157 		if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId + " " + createThread);
158 	}
159 
160 	/**
161 	 * Gives the thread id of this queue
162 	 * <p>
163 	 * @return  the thread id
164 	 * @see     com.sun.star.lib.uno.environments.remote.ThreadID
165 	 */
getThreadId()166 	ThreadId getThreadId() {
167 		return _threadId;
168 	}
169 
acquire()170 	synchronized void acquire() {
171         // add only synchronous queues .
172 		if(_ref_count <= 0 && _sync_jobQueue == null )
173 			_javaThreadPoolFactory.addJobQueue(this);
174 
175 		++ _ref_count;
176 	}
177 
release()178 	synchronized void release() {
179 		-- _ref_count;
180 
181 		if(_ref_count <= 0) {
182             // only synchronous queues needs to be removed .
183             if( _sync_jobQueue == null )
184                 _javaThreadPoolFactory.removeJobQueue(this);
185 
186 
187 			if(_sync_jobQueue != null) {
188 				_sync_jobQueue._async_jobQueue = null;
189 				_sync_jobQueue.release();
190 			}
191 		}
192 	}
193 
194 	/**
195 	 * Removes a job from the queue.
196 	 * <p>
197 	 * @return a job or null if timed out
198 	 * @param  waitTime        the maximum amount of time to wait for a job
199 	 */
removeJob(int waitTime)200 	private Job removeJob(int waitTime) {
201 		if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId);
202 
203 		Job job = null;
204 		synchronized (this) {
205 			// wait max. waitTime time for a job to enter the queue
206 			boolean waited = false;
207 			while(_head == null && (waitTime == 0 || !waited)) {
208 				if(_doDispose == _disposeId) {
209 					_doDispose = null;
210 					throw (DisposedException)
211                         new DisposedException().initCause(_throwable);
212 				}
213 
214 				// notify sync queues
215 				notifyAll();
216 
217 				try {
218 					// wait for new job
219 					wait(waitTime);
220 				}
221 				catch(InterruptedException interruptedException) {
222   					throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
223 				}
224 
225 				// signal that we have already waited once
226 				waited = true;
227 			}
228 
229 
230 			if(_head != null) {
231 				Job current = _head;
232 				_head    = _head._next;
233 
234 				if(_head == null)
235 					_tail = null;
236 
237 				job = current;
238 				_active = true;
239 			}
240 		}
241 
242 		// always wait for asynchron jobqueue to be finished !
243 		if(job != null && _async_jobQueue != null) {
244 			synchronized(_async_jobQueue) {
245 				// wait for async queue to be empty and last job to be done
246 				while(_async_jobQueue._active || _async_jobQueue._head != null) {
247 					if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " +  _async_jobQueue._worker_thread);
248 
249 					if(_doDispose == _disposeId) {
250 						_doDispose = null;
251 						throw (DisposedException)
252                             new DisposedException().initCause(_throwable);
253 					}
254 
255 					try {
256 						_async_jobQueue.wait();
257 					}
258 					catch(InterruptedException interruptedException) {
259 						throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
260 					}
261 				}
262 			}
263 		}
264 
265 		return job;
266 	}
267 
268 	/**
269 	 * Puts a job into the queue.
270 	 * <p>
271 	 * @param  job        the job
272 	 * @param  disposeId  a dispose id
273 	 */
putJob(Job job, Object disposeId)274 	synchronized void putJob(Job job, Object disposeId) {
275 		if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job);
276 
277 		if(_tail != null)
278 			_tail._next = job;
279 		else
280 			_head = job;
281 
282 		_tail = job;
283 
284 		if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one
285 
286 			acquire();
287 
288 			_createThread_now = false;
289 			new JobDispatcher(disposeId).start();
290 		}
291 
292 		// always notify possible waiters
293 		notifyAll();
294 	}
295 
296 	/**
297 	 * Enters the job queue.
298 	 * <p>
299 	 * @return the result of the final job (reply)
300 	 * @param  disposeId  a dispose id
301 	 */
enter(Object disposeId)302 	Object enter(Object disposeId) throws Throwable {
303 		return enter(0, disposeId); // wait infinitly
304 	}
305 
306 	/**
307 	 * Enters the job queue.
308 	 * <p>
309 	 * @return the result of the final job (reply)
310 	 * @param  waitTime   the maximum amount of time to wait for a job (0 means wait infinitly)
311 	 * @param  disposeId  a dispose id
312 	 */
enter(int waitTime, Object disposeId)313 	Object enter(int waitTime, Object disposeId) throws Throwable {
314 		if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId);
315 
316 		boolean quit = false;
317 
318 		Object hold_disposeId = _disposeId;
319 		_disposeId = disposeId;
320 
321 		Object result = null;
322 
323 		Thread hold_worker_thread = _worker_thread;
324 		_worker_thread = Thread.currentThread();
325 
326 		while(!quit) {
327 			Job job = null;
328 
329 			try {
330 				job = removeJob(waitTime);
331 
332 				if(job != null) {
333 					try {
334 						result = job.execute();
335 					}
336 					finally {
337 						_active = false;
338 					}
339 
340 					if (!job.isRequest()) {
341 						job.dispose();
342 
343 						quit = true;
344 					}
345 
346 					job = null;
347 				}
348 				else
349 					quit = true;
350 
351 
352 			}
353 			finally { // ensure that this queue becomes disposed, if necessary
354 				if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result);
355 
356 				synchronized(this) {
357 					if(job != null || (quit && _head == null)) {
358 						_worker_thread = hold_worker_thread;
359 
360 						_createThread_now = true;
361 
362 						_disposeId = hold_disposeId;
363 
364 						if(_sync_jobQueue != null)
365 							notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
366 					}
367 					else
368 						quit = false;
369 
370 				}
371 			}
372 		}
373 
374 		return result;
375 	}
376 
377 	/**
378 	 * If the given disposeId is registered,
379 	 * interrups the worker thread.
380 	 * <p>
381 	 * @param disposeId    the dispose id
382 	 */
dispose(Object disposeId, Throwable throwable)383 	synchronized void dispose(Object disposeId, Throwable throwable) {
384 		if(_sync_jobQueue == null) { // dispose only sync queues
385 			_doDispose = disposeId;
386 			_throwable = throwable;
387 
388 			// get thread out of wait and let it throw the throwable
389 			if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread");
390 
391 			notifyAll();
392 		}
393 	}
394 }
395 
396