xref: /AOO41X/main/jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java (revision cdf0e10c4e3984b49a9502b011690b615761d4a3)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 package com.sun.star.lib.uno.environments.remote;
29 
30 import com.sun.star.lang.DisposedException;
31 
32 /**
33  * The <code>JobQueue</code> implements a queue for jobs.
34  * For every jobs thread id exists a job queue which is registered
35  * at the <code>ThreadPool</code>.
36  * A JobQueue is splitted in a sync job queue and an async job queue.
37  * The sync job queue is the registerd queue, it delegates async jobs
38  * (put by <code>putjob</code>) into the async queue, which is only
39  * known by the sync queue.
40  * <p>
41  * @version     $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $
42  * @author      Kay Ramme
43  * @see         com.sun.star.lib.uno.environments.remote.ThreadPool
44  * @see         com.sun.star.lib.uno.environments.remote.Job
45  * @see         com.sun.star.lib.uno.environments.remote.ThreadID
46  * @since       UDK1.0
47  */
48 public class JobQueue {
49     /**
50      * When set to true, enables various debugging output.
51      */
52     private static final boolean DEBUG = false;
53 
54     protected Job _head;                 // the head of the job list
55     protected Job _tail;                 // the tail of the job list
56 
57     protected ThreadId  _threadId;       // the thread id of the queue
58     protected int       _ref_count = 0;  // the stack deepness
59     protected boolean   _createThread;   // create a worker thread, if needed
60     protected boolean   _createThread_now;   // create a worker thread, if needed
61     protected Thread    _worker_thread;  // the thread that does the jobs
62 
63     protected Object    _disposeId; // the active dispose id
64     protected Object    _doDispose = null;
65     protected Throwable _throwable;
66 
67     protected JobQueue  _async_jobQueue; // chaining job qeueus for asyncs
68     protected JobQueue  _sync_jobQueue;  // chaining job qeueus for syncs
69 
70     protected boolean _active = false;
71 
72     protected JavaThreadPoolFactory _javaThreadPoolFactory;
73 
74     /**
75      * A thread for dispatching jobs
76      */
77     class JobDispatcher extends Thread {
78         Object _disposeId;
79 
80         JobDispatcher(Object disposeId) {
81             if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId);
82 
83             _disposeId = disposeId;
84         }
85 
86         ThreadId getThreadId() {
87             return _threadId;
88         }
89 
90         public void run() {
91             if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread());
92 
93             try {
94                 enter(2000, _disposeId);
95             }
96             catch(Throwable throwable) {
97                 if(_head != null || _active) { // there was a job in progress, so give a stack
98                     System.err.println(getClass().getName() + " - exception occurred:" + throwable);
99                     throwable.printStackTrace(System.err);
100                 }
101             }
102             finally {
103                 release();
104             }
105 
106             if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId);
107 
108 //              try {
109 //                  Object object = new Object();
110 //                  synchronized(object) {
111 //                      object.wait();
112 //                  }
113 //              }
114 //              catch(InterruptedException interruptedException) {
115 //              }
116         }
117     }
118 
119 
120     /**
121      * Constructs a async job queue with the given thread id
122      * which belongs to the given sync job queue.
123      * <p>
124      * @param threadId         the thread id
125      * @param sync_jobQueue    the sync queue this async queue belongs to
126      * @see                    com.sun.star.lib.uno.environments.remote.ThreadID
127      */
128     JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) {
129         _javaThreadPoolFactory = javaThreadPoolFactory;
130         _threadId = ThreadId.createFresh();
131 
132         _sync_jobQueue    = javaThreadPoolFactory.getJobQueue(threadId);
133         if(_sync_jobQueue == null) {
134             _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true);
135             _sync_jobQueue.acquire();
136         }
137 
138         _sync_jobQueue._async_jobQueue = this;
139 
140         _createThread     = true;
141         _createThread_now = true;
142 
143         acquire();
144 
145         if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId);
146     }
147 
148     /**
149      * Constructs a sync job queue with the given thread id and the given thread.
150      * <p>
151      * @param threadId        the thread id
152      * @param createThread    if true, the queue creates a worker thread if needed
153      * @see             com.sun.star.lib.uno.environments.remote.ThreadID
154      */
155     JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){
156         _javaThreadPoolFactory   = javaThreadPoolFactory;
157         _threadId         = threadId;
158         _createThread     = createThread;
159         _createThread_now = createThread;
160 
161         if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId + " " + createThread);
162     }
163 
164     /**
165      * Gives the thread id of this queue
166      * <p>
167      * @return  the thread id
168      * @see     com.sun.star.lib.uno.environments.remote.ThreadID
169      */
170     ThreadId getThreadId() {
171         return _threadId;
172     }
173 
174     synchronized void acquire() {
175         // add only synchronous queues .
176         if(_ref_count <= 0 && _sync_jobQueue == null )
177             _javaThreadPoolFactory.addJobQueue(this);
178 
179         ++ _ref_count;
180     }
181 
182     synchronized void release() {
183         -- _ref_count;
184 
185         if(_ref_count <= 0) {
186             // only synchronous queues needs to be removed .
187             if( _sync_jobQueue == null )
188                 _javaThreadPoolFactory.removeJobQueue(this);
189 
190 
191             if(_sync_jobQueue != null) {
192                 _sync_jobQueue._async_jobQueue = null;
193                 _sync_jobQueue.release();
194             }
195         }
196     }
197 
198     /**
199      * Removes a job from the queue.
200      * <p>
201      * @return a job or null if timed out
202      * @param  waitTime        the maximum amount of time to wait for a job
203      */
204     private Job removeJob(int waitTime) {
205         if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId);
206 
207         Job job = null;
208         synchronized (this) {
209             // wait max. waitTime time for a job to enter the queue
210             boolean waited = false;
211             while(_head == null && (waitTime == 0 || !waited)) {
212                 if(_doDispose == _disposeId) {
213                     _doDispose = null;
214                     throw (DisposedException)
215                         new DisposedException().initCause(_throwable);
216                 }
217 
218                 // notify sync queues
219                 notifyAll();
220 
221                 try {
222                     // wait for new job
223                     wait(waitTime);
224                 }
225                 catch(InterruptedException interruptedException) {
226                     throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
227                 }
228 
229                 // signal that we have already waited once
230                 waited = true;
231             }
232 
233 
234             if(_head != null) {
235                 Job current = _head;
236                 _head    = _head._next;
237 
238                 if(_head == null)
239                     _tail = null;
240 
241                 job = current;
242                 _active = true;
243             }
244         }
245 
246         // always wait for asynchron jobqueue to be finished !
247         if(job != null && _async_jobQueue != null) {
248             synchronized(_async_jobQueue) {
249                 // wait for async queue to be empty and last job to be done
250                 while(_async_jobQueue._active || _async_jobQueue._head != null) {
251                     if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " +  _async_jobQueue._worker_thread);
252 
253                     if(_doDispose == _disposeId) {
254                         _doDispose = null;
255                         throw (DisposedException)
256                             new DisposedException().initCause(_throwable);
257                     }
258 
259                     try {
260                         _async_jobQueue.wait();
261                     }
262                     catch(InterruptedException interruptedException) {
263                         throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
264                     }
265                 }
266             }
267         }
268 
269         return job;
270     }
271 
272     /**
273      * Puts a job into the queue.
274      * <p>
275      * @param  job        the job
276      * @param  disposeId  a dispose id
277      */
278     synchronized void putJob(Job job, Object disposeId) {
279         if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job);
280 
281         if(_tail != null)
282             _tail._next = job;
283         else
284             _head = job;
285 
286         _tail = job;
287 
288         if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one
289 
290             acquire();
291 
292             _createThread_now = false;
293             new JobDispatcher(disposeId).start();
294         }
295 
296         // always notify possible waiters
297         notifyAll();
298     }
299 
300     /**
301      * Enters the job queue.
302      * <p>
303      * @return the result of the final job (reply)
304      * @param  disposeId  a dispose id
305      */
306     Object enter(Object disposeId) throws Throwable {
307         return enter(0, disposeId); // wait infinitly
308     }
309 
310     /**
311      * Enters the job queue.
312      * <p>
313      * @return the result of the final job (reply)
314      * @param  waitTime   the maximum amount of time to wait for a job (0 means wait infinitly)
315      * @param  disposeId  a dispose id
316      */
317     Object enter(int waitTime, Object disposeId) throws Throwable {
318         if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId);
319 
320         boolean quit = false;
321 
322         Object hold_disposeId = _disposeId;
323         _disposeId = disposeId;
324 
325         Object result = null;
326 
327         Thread hold_worker_thread = _worker_thread;
328         _worker_thread = Thread.currentThread();
329 
330         while(!quit) {
331             Job job = null;
332 
333             try {
334                 job = removeJob(waitTime);
335 
336                 if(job != null) {
337                     try {
338                         result = job.execute();
339                     }
340                     finally {
341                         _active = false;
342                     }
343 
344                     if (!job.isRequest()) {
345                         job.dispose();
346 
347                         quit = true;
348                     }
349 
350                     job = null;
351                 }
352                 else
353                     quit = true;
354 
355 
356             }
357             finally { // ensure that this queue becomes disposed, if necessary
358                 if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result);
359 
360                 synchronized(this) {
361                     if(job != null || (quit && _head == null)) {
362                         _worker_thread = hold_worker_thread;
363 
364                         _createThread_now = true;
365 
366                         _disposeId = hold_disposeId;
367 
368                         if(_sync_jobQueue != null)
369                             notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
370                     }
371                     else
372                         quit = false;
373 
374                 }
375             }
376         }
377 
378         return result;
379     }
380 
381     /**
382      * If the given disposeId is registered,
383      * interrups the worker thread.
384      * <p>
385      * @param disposeId    the dispose id
386      */
387     synchronized void dispose(Object disposeId, Throwable throwable) {
388         if(_sync_jobQueue == null) { // dispose only sync queues
389             _doDispose = disposeId;
390             _throwable = throwable;
391 
392             // get thread out of wait and let it throw the throwable
393             if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread");
394 
395             notifyAll();
396         }
397     }
398 }
399 
400