1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_cppu.hxx"
30 #include <hash_set>
31 #include <stdio.h>
32 
33 #include <osl/diagnose.h>
34 #include <osl/mutex.hxx>
35 #include <osl/thread.h>
36 #include <rtl/instance.hxx>
37 
38 #include <uno/threadpool.h>
39 
40 #include "threadpool.hxx"
41 #include "thread.hxx"
42 
43 using namespace ::std;
44 using namespace ::osl;
45 
46 namespace cppu_threadpool
47 {
48 	struct theDisposedCallerAdmin :
49 		public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
50 	{
51 		DisposedCallerAdminHolder operator () () {
52 			return DisposedCallerAdminHolder(new DisposedCallerAdmin());
53 		}
54 	};
55 
56 	DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
57 	{
58 		return theDisposedCallerAdmin::get();
59 	}
60 
61 	DisposedCallerAdmin::~DisposedCallerAdmin()
62 	{
63 #if OSL_DEBUG_LEVEL > 1
64 		if( !m_lst.empty() )
65 		{
66 			printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
67 		}
68 #endif
69 	}
70 
71 	void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
72 	{
73 		MutexGuard guard( m_mutex );
74 		m_lst.push_back( nDisposeId );
75 	}
76 
77 	void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId )
78 	{
79 		MutexGuard guard( m_mutex );
80 		for( DisposedCallerList::iterator ii = m_lst.begin() ;
81 			 ii != m_lst.end() ;
82 			 ++ ii )
83 		{
84 			if( (*ii) == nDisposeId )
85 			{
86 				m_lst.erase( ii );
87 				break;
88 			}
89 		}
90 	}
91 
92 	sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
93 	{
94 		MutexGuard guard( m_mutex );
95 		for( DisposedCallerList::iterator ii = m_lst.begin() ;
96 			 ii != m_lst.end() ;
97 			 ++ ii )
98 		{
99 			if( (*ii) == nDisposeId )
100 			{
101 				return sal_True;
102 			}
103 		}
104 		return sal_False;
105 	}
106 
107 
108 	//-------------------------------------------------------------------------------
109 
110 	struct theThreadPool :
111 		public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool >
112 	{
113 		ThreadPoolHolder operator () () {
114 			ThreadPoolHolder aRet(new ThreadPool());
115 			return aRet;
116 		}
117 	};
118 
119 	ThreadPool::ThreadPool()
120 	{
121         	m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
122 	}
123 
124 	ThreadPool::~ThreadPool()
125 	{
126 #if OSL_DEBUG_LEVEL > 1
127 		if( m_mapQueue.size() )
128 		{
129 			printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
130 		}
131 #endif
132 	}
133 	ThreadPoolHolder ThreadPool::getInstance()
134 	{
135 		return theThreadPool::get();
136 	}
137 
138 
139 	void ThreadPool::dispose( sal_Int64 nDisposeId )
140 	{
141 		if( nDisposeId )
142 		{
143 			m_DisposedCallerAdmin->dispose( nDisposeId );
144 
145 			MutexGuard guard( m_mutex );
146 			for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
147 				 ii != m_mapQueue.end();
148 				 ++ii)
149 			{
150 				if( (*ii).second.first )
151 				{
152 					(*ii).second.first->dispose( nDisposeId );
153 				}
154 				if( (*ii).second.second )
155 				{
156 					(*ii).second.second->dispose( nDisposeId );
157 				}
158 			}
159 		}
160 		else
161 		{
162 			{
163 				MutexGuard guard( m_mutexWaitingThreadList );
164 				for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
165 					 ii != m_lstThreads.end() ;
166 					 ++ ii )
167 				{
168 					// wake the threads up
169 					osl_setCondition( (*ii)->condition );
170 				}
171 			}
172 			ThreadAdmin::getInstance()->join();
173 		}
174 	}
175 
176 	void ThreadPool::stopDisposing( sal_Int64 nDisposeId )
177 	{
178 		m_DisposedCallerAdmin->stopDisposing( nDisposeId );
179 	}
180 
181 	/******************
182 	 * This methods lets the thread wait a certain amount of time. If within this timespan
183 	 * a new request comes in, this thread is reused. This is done only to improve performance,
184 	 * it is not required for threadpool functionality.
185 	 ******************/
186 	void ThreadPool::waitInPool( ORequestThread * pThread )
187 	{
188 		struct WaitingThread waitingThread;
189 		waitingThread.condition = osl_createCondition();
190 		waitingThread.thread = pThread;
191 		{
192 			MutexGuard guard( m_mutexWaitingThreadList );
193 			m_lstThreads.push_front( &waitingThread );
194 		}
195 
196 		// let the thread wait 2 seconds
197 		TimeValue time = { 2 , 0 };
198 		osl_waitCondition( waitingThread.condition , &time );
199 
200 		{
201 			MutexGuard guard ( m_mutexWaitingThreadList );
202 			if( waitingThread.thread )
203 			{
204 				// thread wasn't reused, remove it from the list
205 				WaitingThreadList::iterator ii = find(
206 					m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
207 				OSL_ASSERT( ii != m_lstThreads.end() );
208 				m_lstThreads.erase( ii );
209 			}
210 		}
211 
212 		osl_destroyCondition( waitingThread.condition );
213 	}
214 
215 	void ThreadPool::createThread( JobQueue *pQueue ,
216 								   const ByteSequence &aThreadId,
217 								   sal_Bool bAsynchron )
218 	{
219 		sal_Bool bCreate = sal_True;
220 		{
221 			// Can a thread be reused ?
222 			MutexGuard guard( m_mutexWaitingThreadList );
223 			if( ! m_lstThreads.empty() )
224 			{
225 				// inform the thread and let it go
226 				struct WaitingThread *pWaitingThread = m_lstThreads.back();
227 				pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
228 				pWaitingThread->thread = 0;
229 
230 				// remove from list
231 				m_lstThreads.pop_back();
232 
233 				// let the thread go
234 				osl_setCondition( pWaitingThread->condition );
235 				bCreate = sal_False;
236 			}
237 		}
238 
239 		if( bCreate )
240 		{
241 			ORequestThread *pThread =
242 				new ORequestThread( pQueue , aThreadId, bAsynchron);
243 			// deletes itself !
244 			pThread->create();
245 		}
246 	}
247 
248 	sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron )
249 	{
250 		MutexGuard guard( m_mutex );
251 
252 		ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
253 		OSL_ASSERT( ii != m_mapQueue.end() );
254 
255 		if( bAsynchron )
256 		{
257 			if( ! (*ii).second.second->isEmpty() )
258 			{
259 				// another thread has put something into the queue
260 				return sal_False;
261 			}
262 
263 			(*ii).second.second = 0;
264 			if( (*ii).second.first )
265 			{
266 				// all oneway request have been processed, now
267 				// synchronus requests may go on
268 				(*ii).second.first->resume();
269 			}
270 		}
271 		else
272 		{
273 			if( ! (*ii).second.first->isEmpty() )
274 			{
275 				// another thread has put something into the queue
276 				return sal_False;
277 			}
278 			(*ii).second.first = 0;
279 		}
280 
281 		if( 0 == (*ii).second.first && 0 == (*ii).second.second )
282 		{
283 			m_mapQueue.erase( ii );
284 		}
285 
286 		return sal_True;
287 	}
288 
289 
290 	void ThreadPool::addJob(
291 		const ByteSequence &aThreadId ,
292 		sal_Bool bAsynchron,
293 		void *pThreadSpecificData,
294 		RequestFun * doRequest )
295 	{
296 		sal_Bool bCreateThread = sal_False;
297 		JobQueue *pQueue = 0;
298 		{
299 			MutexGuard guard( m_mutex );
300 
301 			ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
302 
303 			if( ii == m_mapQueue.end() )
304 			{
305 				m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 );
306 				ii = m_mapQueue.find( aThreadId );
307 				OSL_ASSERT( ii != m_mapQueue.end() );
308 			}
309 
310 			if( bAsynchron )
311 			{
312 				if( ! (*ii).second.second )
313 				{
314 					(*ii).second.second = new JobQueue();
315 					bCreateThread = sal_True;
316 				}
317 				pQueue = (*ii).second.second;
318 			}
319 			else
320 			{
321 				if( ! (*ii).second.first )
322 				{
323 					(*ii).second.first = new JobQueue();
324 					bCreateThread = sal_True;
325 				}
326 				pQueue = (*ii).second.first;
327 
328 				if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
329 				{
330 					pQueue->suspend();
331 				}
332 			}
333 			pQueue->add( pThreadSpecificData , doRequest );
334 		}
335 
336 		if( bCreateThread )
337 		{
338 			createThread( pQueue , aThreadId , bAsynchron);
339 		}
340 	}
341 
342 	void ThreadPool::prepare( const ByteSequence &aThreadId )
343 	{
344 		MutexGuard guard( m_mutex );
345 
346 		ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
347 
348 		if( ii == m_mapQueue.end() )
349 		{
350 			JobQueue *p = new JobQueue();
351 			m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 );
352 		}
353 		else if( 0 == (*ii).second.first )
354 		{
355 			(*ii).second.first = new JobQueue();
356 		}
357 	}
358 
359 	void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
360 	{
361 		JobQueue *pQueue = 0;
362 		{
363 			MutexGuard guard( m_mutex );
364 
365 			ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
366 
367 			OSL_ASSERT( ii != m_mapQueue.end() );
368 			pQueue = (*ii).second.first;
369 		}
370 
371 		OSL_ASSERT( pQueue );
372 		void *pReturn = pQueue->enter( nDisposeId );
373 
374 		if( pQueue->isCallstackEmpty() )
375 		{
376 			if( revokeQueue( aThreadId , sal_False) )
377 			{
378 				// remove queue
379 				delete pQueue;
380 			}
381 		}
382 		return pReturn;
383 	}
384 }
385 
386 
387 using namespace cppu_threadpool;
388 
389 struct uno_ThreadPool_Equal
390 {
391 	sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
392 		{
393 			return a == b;
394 		}
395 };
396 
397 struct uno_ThreadPool_Hash
398 {
399 	sal_Size operator () ( const uno_ThreadPool &a  )  const
400 		{
401 			return (sal_Size) a;
402 		}
403 };
404 
405 
406 
407 typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
408 
409 static ThreadpoolHashSet *g_pThreadpoolHashSet;
410 
411 struct _uno_ThreadPool
412 {
413 	sal_Int32 dummy;
414 };
415 
416 extern "C" uno_ThreadPool SAL_CALL
417 uno_threadpool_create() SAL_THROW_EXTERN_C()
418 {
419 	MutexGuard guard( Mutex::getGlobalMutex() );
420 	if( ! g_pThreadpoolHashSet )
421 	{
422 		g_pThreadpoolHashSet = new ThreadpoolHashSet();
423 	}
424 
425 	// Just ensure that the handle is unique in the process (via heap)
426 	uno_ThreadPool h = new struct _uno_ThreadPool;
427 	g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) );
428 	return h;
429 }
430 
431 extern "C" void SAL_CALL
432 uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
433 {
434 	sal_Sequence *pThreadId = 0;
435 	uno_getIdOfCurrentThread( &pThreadId );
436 	ThreadPool::getInstance()->prepare( pThreadId );
437 	rtl_byte_sequence_release( pThreadId );
438 	uno_releaseIdFromCurrentThread();
439 }
440 
441 extern "C" void SAL_CALL
442 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
443 	SAL_THROW_EXTERN_C()
444 {
445 	sal_Sequence *pThreadId = 0;
446 	uno_getIdOfCurrentThread( &pThreadId );
447 	*ppJob =
448 		ThreadPool::getInstance()->enter(
449             pThreadId,
450             sal::static_int_cast< sal_Int64 >(
451                 reinterpret_cast< sal_IntPtr >(hPool)) );
452 	rtl_byte_sequence_release( pThreadId );
453 	uno_releaseIdFromCurrentThread();
454 }
455 
456 extern "C" void SAL_CALL
457 uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
458 {
459 	// we might do here some tiding up in case a thread called attach but never detach
460 }
461 
462 extern "C" void SAL_CALL
463 uno_threadpool_putJob(
464 	uno_ThreadPool,
465 	sal_Sequence *pThreadId,
466 	void *pJob,
467 	void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
468 	sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
469 {
470 	ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest );
471 }
472 
473 extern "C" void SAL_CALL
474 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
475 {
476 	ThreadPool::getInstance()->dispose(
477         sal::static_int_cast< sal_Int64 >(
478             reinterpret_cast< sal_IntPtr >(hPool)) );
479 }
480 
481 extern "C" void SAL_CALL
482 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
483 {
484 	ThreadPool::getInstance()->stopDisposing(
485         sal::static_int_cast< sal_Int64 >(
486             reinterpret_cast< sal_IntPtr >(hPool)) );
487 
488 	if( hPool )
489 	{
490 		// special treatment for 0 !
491 		OSL_ASSERT( g_pThreadpoolHashSet );
492 
493 		MutexGuard guard( Mutex::getGlobalMutex() );
494 
495 		ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
496 		OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
497 		g_pThreadpoolHashSet->erase( ii );
498 		delete hPool;
499 
500 		if( g_pThreadpoolHashSet->empty() )
501 		{
502 			delete g_pThreadpoolHashSet;
503 			g_pThreadpoolHashSet = 0;
504 		}
505 	}
506 }
507