xref: /trunk/main/cppu/source/threadpool/threadpool.cxx (revision cdf0e10c4e3984b49a9502b011690b615761d4a3)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_cppu.hxx"
30 #include <hash_set>
31 #include <stdio.h>
32 
33 #include <osl/diagnose.h>
34 #include <osl/mutex.hxx>
35 #include <osl/thread.h>
36 #include <rtl/instance.hxx>
37 
38 #include <uno/threadpool.h>
39 
40 #include "threadpool.hxx"
41 #include "thread.hxx"
42 
43 using namespace ::std;
44 using namespace ::osl;
45 
46 namespace cppu_threadpool
47 {
48     struct theDisposedCallerAdmin :
49         public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
50     {
51         DisposedCallerAdminHolder operator () () {
52             return DisposedCallerAdminHolder(new DisposedCallerAdmin());
53         }
54     };
55 
56     DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
57     {
58         return theDisposedCallerAdmin::get();
59     }
60 
61     DisposedCallerAdmin::~DisposedCallerAdmin()
62     {
63 #if OSL_DEBUG_LEVEL > 1
64         if( !m_lst.empty() )
65         {
66             printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
67         }
68 #endif
69     }
70 
71     void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
72     {
73         MutexGuard guard( m_mutex );
74         m_lst.push_back( nDisposeId );
75     }
76 
77     void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId )
78     {
79         MutexGuard guard( m_mutex );
80         for( DisposedCallerList::iterator ii = m_lst.begin() ;
81              ii != m_lst.end() ;
82              ++ ii )
83         {
84             if( (*ii) == nDisposeId )
85             {
86                 m_lst.erase( ii );
87                 break;
88             }
89         }
90     }
91 
92     sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
93     {
94         MutexGuard guard( m_mutex );
95         for( DisposedCallerList::iterator ii = m_lst.begin() ;
96              ii != m_lst.end() ;
97              ++ ii )
98         {
99             if( (*ii) == nDisposeId )
100             {
101                 return sal_True;
102             }
103         }
104         return sal_False;
105     }
106 
107 
108     //-------------------------------------------------------------------------------
109 
110     struct theThreadPool :
111         public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool >
112     {
113         ThreadPoolHolder operator () () {
114             ThreadPoolHolder aRet(new ThreadPool());
115             return aRet;
116         }
117     };
118 
119     ThreadPool::ThreadPool()
120     {
121             m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
122     }
123 
124     ThreadPool::~ThreadPool()
125     {
126 #if OSL_DEBUG_LEVEL > 1
127         if( m_mapQueue.size() )
128         {
129             printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
130         }
131 #endif
132     }
133     ThreadPoolHolder ThreadPool::getInstance()
134     {
135         return theThreadPool::get();
136     }
137 
138 
139     void ThreadPool::dispose( sal_Int64 nDisposeId )
140     {
141         if( nDisposeId )
142         {
143             m_DisposedCallerAdmin->dispose( nDisposeId );
144 
145             MutexGuard guard( m_mutex );
146             for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
147                  ii != m_mapQueue.end();
148                  ++ii)
149             {
150                 if( (*ii).second.first )
151                 {
152                     (*ii).second.first->dispose( nDisposeId );
153                 }
154                 if( (*ii).second.second )
155                 {
156                     (*ii).second.second->dispose( nDisposeId );
157                 }
158             }
159         }
160         else
161         {
162             {
163                 MutexGuard guard( m_mutexWaitingThreadList );
164                 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
165                      ii != m_lstThreads.end() ;
166                      ++ ii )
167                 {
168                     // wake the threads up
169                     osl_setCondition( (*ii)->condition );
170                 }
171             }
172             ThreadAdmin::getInstance()->join();
173         }
174     }
175 
176     void ThreadPool::stopDisposing( sal_Int64 nDisposeId )
177     {
178         m_DisposedCallerAdmin->stopDisposing( nDisposeId );
179     }
180 
181     /******************
182      * This methods lets the thread wait a certain amount of time. If within this timespan
183      * a new request comes in, this thread is reused. This is done only to improve performance,
184      * it is not required for threadpool functionality.
185      ******************/
186     void ThreadPool::waitInPool( ORequestThread * pThread )
187     {
188         struct WaitingThread waitingThread;
189         waitingThread.condition = osl_createCondition();
190         waitingThread.thread = pThread;
191         {
192             MutexGuard guard( m_mutexWaitingThreadList );
193             m_lstThreads.push_front( &waitingThread );
194         }
195 
196         // let the thread wait 2 seconds
197         TimeValue time = { 2 , 0 };
198         osl_waitCondition( waitingThread.condition , &time );
199 
200         {
201             MutexGuard guard ( m_mutexWaitingThreadList );
202             if( waitingThread.thread )
203             {
204                 // thread wasn't reused, remove it from the list
205                 WaitingThreadList::iterator ii = find(
206                     m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
207                 OSL_ASSERT( ii != m_lstThreads.end() );
208                 m_lstThreads.erase( ii );
209             }
210         }
211 
212         osl_destroyCondition( waitingThread.condition );
213     }
214 
215     void ThreadPool::createThread( JobQueue *pQueue ,
216                                    const ByteSequence &aThreadId,
217                                    sal_Bool bAsynchron )
218     {
219         sal_Bool bCreate = sal_True;
220         {
221             // Can a thread be reused ?
222             MutexGuard guard( m_mutexWaitingThreadList );
223             if( ! m_lstThreads.empty() )
224             {
225                 // inform the thread and let it go
226                 struct WaitingThread *pWaitingThread = m_lstThreads.back();
227                 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
228                 pWaitingThread->thread = 0;
229 
230                 // remove from list
231                 m_lstThreads.pop_back();
232 
233                 // let the thread go
234                 osl_setCondition( pWaitingThread->condition );
235                 bCreate = sal_False;
236             }
237         }
238 
239         if( bCreate )
240         {
241             ORequestThread *pThread =
242                 new ORequestThread( pQueue , aThreadId, bAsynchron);
243             // deletes itself !
244             pThread->create();
245         }
246     }
247 
248     sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron )
249     {
250         MutexGuard guard( m_mutex );
251 
252         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
253         OSL_ASSERT( ii != m_mapQueue.end() );
254 
255         if( bAsynchron )
256         {
257             if( ! (*ii).second.second->isEmpty() )
258             {
259                 // another thread has put something into the queue
260                 return sal_False;
261             }
262 
263             (*ii).second.second = 0;
264             if( (*ii).second.first )
265             {
266                 // all oneway request have been processed, now
267                 // synchronus requests may go on
268                 (*ii).second.first->resume();
269             }
270         }
271         else
272         {
273             if( ! (*ii).second.first->isEmpty() )
274             {
275                 // another thread has put something into the queue
276                 return sal_False;
277             }
278             (*ii).second.first = 0;
279         }
280 
281         if( 0 == (*ii).second.first && 0 == (*ii).second.second )
282         {
283             m_mapQueue.erase( ii );
284         }
285 
286         return sal_True;
287     }
288 
289 
290     void ThreadPool::addJob(
291         const ByteSequence &aThreadId ,
292         sal_Bool bAsynchron,
293         void *pThreadSpecificData,
294         RequestFun * doRequest )
295     {
296         sal_Bool bCreateThread = sal_False;
297         JobQueue *pQueue = 0;
298         {
299             MutexGuard guard( m_mutex );
300 
301             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
302 
303             if( ii == m_mapQueue.end() )
304             {
305                 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 );
306                 ii = m_mapQueue.find( aThreadId );
307                 OSL_ASSERT( ii != m_mapQueue.end() );
308             }
309 
310             if( bAsynchron )
311             {
312                 if( ! (*ii).second.second )
313                 {
314                     (*ii).second.second = new JobQueue();
315                     bCreateThread = sal_True;
316                 }
317                 pQueue = (*ii).second.second;
318             }
319             else
320             {
321                 if( ! (*ii).second.first )
322                 {
323                     (*ii).second.first = new JobQueue();
324                     bCreateThread = sal_True;
325                 }
326                 pQueue = (*ii).second.first;
327 
328                 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
329                 {
330                     pQueue->suspend();
331                 }
332             }
333             pQueue->add( pThreadSpecificData , doRequest );
334         }
335 
336         if( bCreateThread )
337         {
338             createThread( pQueue , aThreadId , bAsynchron);
339         }
340     }
341 
342     void ThreadPool::prepare( const ByteSequence &aThreadId )
343     {
344         MutexGuard guard( m_mutex );
345 
346         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
347 
348         if( ii == m_mapQueue.end() )
349         {
350             JobQueue *p = new JobQueue();
351             m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 );
352         }
353         else if( 0 == (*ii).second.first )
354         {
355             (*ii).second.first = new JobQueue();
356         }
357     }
358 
359     void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
360     {
361         JobQueue *pQueue = 0;
362         {
363             MutexGuard guard( m_mutex );
364 
365             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
366 
367             OSL_ASSERT( ii != m_mapQueue.end() );
368             pQueue = (*ii).second.first;
369         }
370 
371         OSL_ASSERT( pQueue );
372         void *pReturn = pQueue->enter( nDisposeId );
373 
374         if( pQueue->isCallstackEmpty() )
375         {
376             if( revokeQueue( aThreadId , sal_False) )
377             {
378                 // remove queue
379                 delete pQueue;
380             }
381         }
382         return pReturn;
383     }
384 }
385 
386 
387 using namespace cppu_threadpool;
388 
389 struct uno_ThreadPool_Equal
390 {
391     sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
392         {
393             return a == b;
394         }
395 };
396 
397 struct uno_ThreadPool_Hash
398 {
399     sal_Size operator () ( const uno_ThreadPool &a  )  const
400         {
401             return (sal_Size) a;
402         }
403 };
404 
405 
406 
407 typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
408 
409 static ThreadpoolHashSet *g_pThreadpoolHashSet;
410 
411 struct _uno_ThreadPool
412 {
413     sal_Int32 dummy;
414 };
415 
416 extern "C" uno_ThreadPool SAL_CALL
417 uno_threadpool_create() SAL_THROW_EXTERN_C()
418 {
419     MutexGuard guard( Mutex::getGlobalMutex() );
420     if( ! g_pThreadpoolHashSet )
421     {
422         g_pThreadpoolHashSet = new ThreadpoolHashSet();
423     }
424 
425     // Just ensure that the handle is unique in the process (via heap)
426     uno_ThreadPool h = new struct _uno_ThreadPool;
427     g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) );
428     return h;
429 }
430 
431 extern "C" void SAL_CALL
432 uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
433 {
434     sal_Sequence *pThreadId = 0;
435     uno_getIdOfCurrentThread( &pThreadId );
436     ThreadPool::getInstance()->prepare( pThreadId );
437     rtl_byte_sequence_release( pThreadId );
438     uno_releaseIdFromCurrentThread();
439 }
440 
441 extern "C" void SAL_CALL
442 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
443     SAL_THROW_EXTERN_C()
444 {
445     sal_Sequence *pThreadId = 0;
446     uno_getIdOfCurrentThread( &pThreadId );
447     *ppJob =
448         ThreadPool::getInstance()->enter(
449             pThreadId,
450             sal::static_int_cast< sal_Int64 >(
451                 reinterpret_cast< sal_IntPtr >(hPool)) );
452     rtl_byte_sequence_release( pThreadId );
453     uno_releaseIdFromCurrentThread();
454 }
455 
456 extern "C" void SAL_CALL
457 uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
458 {
459     // we might do here some tiding up in case a thread called attach but never detach
460 }
461 
462 extern "C" void SAL_CALL
463 uno_threadpool_putJob(
464     uno_ThreadPool,
465     sal_Sequence *pThreadId,
466     void *pJob,
467     void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
468     sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
469 {
470     ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest );
471 }
472 
473 extern "C" void SAL_CALL
474 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
475 {
476     ThreadPool::getInstance()->dispose(
477         sal::static_int_cast< sal_Int64 >(
478             reinterpret_cast< sal_IntPtr >(hPool)) );
479 }
480 
481 extern "C" void SAL_CALL
482 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
483 {
484     ThreadPool::getInstance()->stopDisposing(
485         sal::static_int_cast< sal_Int64 >(
486             reinterpret_cast< sal_IntPtr >(hPool)) );
487 
488     if( hPool )
489     {
490         // special treatment for 0 !
491         OSL_ASSERT( g_pThreadpoolHashSet );
492 
493         MutexGuard guard( Mutex::getGlobalMutex() );
494 
495         ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
496         OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
497         g_pThreadpoolHashSet->erase( ii );
498         delete hPool;
499 
500         if( g_pThreadpoolHashSet->empty() )
501         {
502             delete g_pThreadpoolHashSet;
503             g_pThreadpoolHashSet = 0;
504         }
505     }
506 }
507