1 /************************************************************************* 2 * 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * Copyright 2000, 2010 Oracle and/or its affiliates. 6 * 7 * OpenOffice.org - a multi-platform office productivity suite 8 * 9 * This file is part of OpenOffice.org. 10 * 11 * OpenOffice.org is free software: you can redistribute it and/or modify 12 * it under the terms of the GNU Lesser General Public License version 3 13 * only, as published by the Free Software Foundation. 14 * 15 * OpenOffice.org is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU Lesser General Public License version 3 for more details 19 * (a copy is included in the LICENSE file that accompanied this code). 20 * 21 * You should have received a copy of the GNU Lesser General Public License 22 * version 3 along with OpenOffice.org. If not, see 23 * <http://www.openoffice.org/license.html> 24 * for a copy of the LGPLv3 License. 25 * 26 ************************************************************************/ 27 28 // MARKER(update_precomp.py): autogen include statement, do not remove 29 #include "precompiled_cppu.hxx" 30 #include <hash_set> 31 #include <stdio.h> 32 33 #include <osl/diagnose.h> 34 #include <osl/mutex.hxx> 35 #include <osl/thread.h> 36 #include <rtl/instance.hxx> 37 38 #include <uno/threadpool.h> 39 40 #include "threadpool.hxx" 41 #include "thread.hxx" 42 43 using namespace ::std; 44 using namespace ::osl; 45 46 namespace cppu_threadpool 47 { 48 struct theDisposedCallerAdmin : 49 public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin > 50 { 51 DisposedCallerAdminHolder operator () () { 52 return DisposedCallerAdminHolder(new DisposedCallerAdmin()); 53 } 54 }; 55 56 DisposedCallerAdminHolder DisposedCallerAdmin::getInstance() 57 { 58 return theDisposedCallerAdmin::get(); 59 } 60 61 DisposedCallerAdmin::~DisposedCallerAdmin() 62 { 63 #if OSL_DEBUG_LEVEL > 1 64 if( !m_lst.empty() ) 65 { 66 printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( ))); 67 } 68 #endif 69 } 70 71 void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId ) 72 { 73 MutexGuard guard( m_mutex ); 74 m_lst.push_back( nDisposeId ); 75 } 76 77 void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId ) 78 { 79 MutexGuard guard( m_mutex ); 80 for( DisposedCallerList::iterator ii = m_lst.begin() ; 81 ii != m_lst.end() ; 82 ++ ii ) 83 { 84 if( (*ii) == nDisposeId ) 85 { 86 m_lst.erase( ii ); 87 break; 88 } 89 } 90 } 91 92 sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId ) 93 { 94 MutexGuard guard( m_mutex ); 95 for( DisposedCallerList::iterator ii = m_lst.begin() ; 96 ii != m_lst.end() ; 97 ++ ii ) 98 { 99 if( (*ii) == nDisposeId ) 100 { 101 return sal_True; 102 } 103 } 104 return sal_False; 105 } 106 107 108 //------------------------------------------------------------------------------- 109 110 struct theThreadPool : 111 public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool > 112 { 113 ThreadPoolHolder operator () () { 114 ThreadPoolHolder aRet(new ThreadPool()); 115 return aRet; 116 } 117 }; 118 119 ThreadPool::ThreadPool() 120 { 121 m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance(); 122 } 123 124 ThreadPool::~ThreadPool() 125 { 126 #if OSL_DEBUG_LEVEL > 1 127 if( m_mapQueue.size() ) 128 { 129 printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) ); 130 } 131 #endif 132 } 133 ThreadPoolHolder ThreadPool::getInstance() 134 { 135 return theThreadPool::get(); 136 } 137 138 139 void ThreadPool::dispose( sal_Int64 nDisposeId ) 140 { 141 if( nDisposeId ) 142 { 143 m_DisposedCallerAdmin->dispose( nDisposeId ); 144 145 MutexGuard guard( m_mutex ); 146 for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; 147 ii != m_mapQueue.end(); 148 ++ii) 149 { 150 if( (*ii).second.first ) 151 { 152 (*ii).second.first->dispose( nDisposeId ); 153 } 154 if( (*ii).second.second ) 155 { 156 (*ii).second.second->dispose( nDisposeId ); 157 } 158 } 159 } 160 else 161 { 162 { 163 MutexGuard guard( m_mutexWaitingThreadList ); 164 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; 165 ii != m_lstThreads.end() ; 166 ++ ii ) 167 { 168 // wake the threads up 169 osl_setCondition( (*ii)->condition ); 170 } 171 } 172 ThreadAdmin::getInstance()->join(); 173 } 174 } 175 176 void ThreadPool::stopDisposing( sal_Int64 nDisposeId ) 177 { 178 m_DisposedCallerAdmin->stopDisposing( nDisposeId ); 179 } 180 181 /****************** 182 * This methods lets the thread wait a certain amount of time. If within this timespan 183 * a new request comes in, this thread is reused. This is done only to improve performance, 184 * it is not required for threadpool functionality. 185 ******************/ 186 void ThreadPool::waitInPool( ORequestThread * pThread ) 187 { 188 struct WaitingThread waitingThread; 189 waitingThread.condition = osl_createCondition(); 190 waitingThread.thread = pThread; 191 { 192 MutexGuard guard( m_mutexWaitingThreadList ); 193 m_lstThreads.push_front( &waitingThread ); 194 } 195 196 // let the thread wait 2 seconds 197 TimeValue time = { 2 , 0 }; 198 osl_waitCondition( waitingThread.condition , &time ); 199 200 { 201 MutexGuard guard ( m_mutexWaitingThreadList ); 202 if( waitingThread.thread ) 203 { 204 // thread wasn't reused, remove it from the list 205 WaitingThreadList::iterator ii = find( 206 m_lstThreads.begin(), m_lstThreads.end(), &waitingThread ); 207 OSL_ASSERT( ii != m_lstThreads.end() ); 208 m_lstThreads.erase( ii ); 209 } 210 } 211 212 osl_destroyCondition( waitingThread.condition ); 213 } 214 215 void ThreadPool::createThread( JobQueue *pQueue , 216 const ByteSequence &aThreadId, 217 sal_Bool bAsynchron ) 218 { 219 sal_Bool bCreate = sal_True; 220 { 221 // Can a thread be reused ? 222 MutexGuard guard( m_mutexWaitingThreadList ); 223 if( ! m_lstThreads.empty() ) 224 { 225 // inform the thread and let it go 226 struct WaitingThread *pWaitingThread = m_lstThreads.back(); 227 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron ); 228 pWaitingThread->thread = 0; 229 230 // remove from list 231 m_lstThreads.pop_back(); 232 233 // let the thread go 234 osl_setCondition( pWaitingThread->condition ); 235 bCreate = sal_False; 236 } 237 } 238 239 if( bCreate ) 240 { 241 ORequestThread *pThread = 242 new ORequestThread( pQueue , aThreadId, bAsynchron); 243 // deletes itself ! 244 pThread->create(); 245 } 246 } 247 248 sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron ) 249 { 250 MutexGuard guard( m_mutex ); 251 252 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 253 OSL_ASSERT( ii != m_mapQueue.end() ); 254 255 if( bAsynchron ) 256 { 257 if( ! (*ii).second.second->isEmpty() ) 258 { 259 // another thread has put something into the queue 260 return sal_False; 261 } 262 263 (*ii).second.second = 0; 264 if( (*ii).second.first ) 265 { 266 // all oneway request have been processed, now 267 // synchronus requests may go on 268 (*ii).second.first->resume(); 269 } 270 } 271 else 272 { 273 if( ! (*ii).second.first->isEmpty() ) 274 { 275 // another thread has put something into the queue 276 return sal_False; 277 } 278 (*ii).second.first = 0; 279 } 280 281 if( 0 == (*ii).second.first && 0 == (*ii).second.second ) 282 { 283 m_mapQueue.erase( ii ); 284 } 285 286 return sal_True; 287 } 288 289 290 void ThreadPool::addJob( 291 const ByteSequence &aThreadId , 292 sal_Bool bAsynchron, 293 void *pThreadSpecificData, 294 RequestFun * doRequest ) 295 { 296 sal_Bool bCreateThread = sal_False; 297 JobQueue *pQueue = 0; 298 { 299 MutexGuard guard( m_mutex ); 300 301 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 302 303 if( ii == m_mapQueue.end() ) 304 { 305 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 ); 306 ii = m_mapQueue.find( aThreadId ); 307 OSL_ASSERT( ii != m_mapQueue.end() ); 308 } 309 310 if( bAsynchron ) 311 { 312 if( ! (*ii).second.second ) 313 { 314 (*ii).second.second = new JobQueue(); 315 bCreateThread = sal_True; 316 } 317 pQueue = (*ii).second.second; 318 } 319 else 320 { 321 if( ! (*ii).second.first ) 322 { 323 (*ii).second.first = new JobQueue(); 324 bCreateThread = sal_True; 325 } 326 pQueue = (*ii).second.first; 327 328 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) ) 329 { 330 pQueue->suspend(); 331 } 332 } 333 pQueue->add( pThreadSpecificData , doRequest ); 334 } 335 336 if( bCreateThread ) 337 { 338 createThread( pQueue , aThreadId , bAsynchron); 339 } 340 } 341 342 void ThreadPool::prepare( const ByteSequence &aThreadId ) 343 { 344 MutexGuard guard( m_mutex ); 345 346 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 347 348 if( ii == m_mapQueue.end() ) 349 { 350 JobQueue *p = new JobQueue(); 351 m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 ); 352 } 353 else if( 0 == (*ii).second.first ) 354 { 355 (*ii).second.first = new JobQueue(); 356 } 357 } 358 359 void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId ) 360 { 361 JobQueue *pQueue = 0; 362 { 363 MutexGuard guard( m_mutex ); 364 365 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 366 367 OSL_ASSERT( ii != m_mapQueue.end() ); 368 pQueue = (*ii).second.first; 369 } 370 371 OSL_ASSERT( pQueue ); 372 void *pReturn = pQueue->enter( nDisposeId ); 373 374 if( pQueue->isCallstackEmpty() ) 375 { 376 if( revokeQueue( aThreadId , sal_False) ) 377 { 378 // remove queue 379 delete pQueue; 380 } 381 } 382 return pReturn; 383 } 384 } 385 386 387 using namespace cppu_threadpool; 388 389 struct uno_ThreadPool_Equal 390 { 391 sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const 392 { 393 return a == b; 394 } 395 }; 396 397 struct uno_ThreadPool_Hash 398 { 399 sal_Size operator () ( const uno_ThreadPool &a ) const 400 { 401 return (sal_Size) a; 402 } 403 }; 404 405 406 407 typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet; 408 409 static ThreadpoolHashSet *g_pThreadpoolHashSet; 410 411 struct _uno_ThreadPool 412 { 413 sal_Int32 dummy; 414 }; 415 416 extern "C" uno_ThreadPool SAL_CALL 417 uno_threadpool_create() SAL_THROW_EXTERN_C() 418 { 419 MutexGuard guard( Mutex::getGlobalMutex() ); 420 if( ! g_pThreadpoolHashSet ) 421 { 422 g_pThreadpoolHashSet = new ThreadpoolHashSet(); 423 } 424 425 // Just ensure that the handle is unique in the process (via heap) 426 uno_ThreadPool h = new struct _uno_ThreadPool; 427 g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) ); 428 return h; 429 } 430 431 extern "C" void SAL_CALL 432 uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C() 433 { 434 sal_Sequence *pThreadId = 0; 435 uno_getIdOfCurrentThread( &pThreadId ); 436 ThreadPool::getInstance()->prepare( pThreadId ); 437 rtl_byte_sequence_release( pThreadId ); 438 uno_releaseIdFromCurrentThread(); 439 } 440 441 extern "C" void SAL_CALL 442 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) 443 SAL_THROW_EXTERN_C() 444 { 445 sal_Sequence *pThreadId = 0; 446 uno_getIdOfCurrentThread( &pThreadId ); 447 *ppJob = 448 ThreadPool::getInstance()->enter( 449 pThreadId, 450 sal::static_int_cast< sal_Int64 >( 451 reinterpret_cast< sal_IntPtr >(hPool)) ); 452 rtl_byte_sequence_release( pThreadId ); 453 uno_releaseIdFromCurrentThread(); 454 } 455 456 extern "C" void SAL_CALL 457 uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C() 458 { 459 // we might do here some tiding up in case a thread called attach but never detach 460 } 461 462 extern "C" void SAL_CALL 463 uno_threadpool_putJob( 464 uno_ThreadPool, 465 sal_Sequence *pThreadId, 466 void *pJob, 467 void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), 468 sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() 469 { 470 ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest ); 471 } 472 473 extern "C" void SAL_CALL 474 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() 475 { 476 ThreadPool::getInstance()->dispose( 477 sal::static_int_cast< sal_Int64 >( 478 reinterpret_cast< sal_IntPtr >(hPool)) ); 479 } 480 481 extern "C" void SAL_CALL 482 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() 483 { 484 ThreadPool::getInstance()->stopDisposing( 485 sal::static_int_cast< sal_Int64 >( 486 reinterpret_cast< sal_IntPtr >(hPool)) ); 487 488 if( hPool ) 489 { 490 // special treatment for 0 ! 491 OSL_ASSERT( g_pThreadpoolHashSet ); 492 493 MutexGuard guard( Mutex::getGlobalMutex() ); 494 495 ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool ); 496 OSL_ASSERT( ii != g_pThreadpoolHashSet->end() ); 497 g_pThreadpoolHashSet->erase( ii ); 498 delete hPool; 499 500 if( g_pThreadpoolHashSet->empty() ) 501 { 502 delete g_pThreadpoolHashSet; 503 g_pThreadpoolHashSet = 0; 504 } 505 } 506 } 507