xref: /trunk/main/cppu/source/threadpool/thread.cxx (revision cdf0e10c4e3984b49a9502b011690b615761d4a3)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_cppu.hxx"
30 #include <stdio.h>
31 #include <osl/diagnose.h>
32 #include <uno/threadpool.h>
33 
34 #include <rtl/instance.hxx>
35 
36 #include "thread.hxx"
37 #include "jobqueue.hxx"
38 #include "threadpool.hxx"
39 
40 
41 using namespace osl;
42 extern "C" {
43 
44 void SAL_CALL cppu_requestThreadWorker( void *pVoid )
45 {
46     ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid;
47 
48     pThread->run();
49     pThread->onTerminated();
50 }
51 
52 }
53 
54 namespace cppu_threadpool {
55 
56 // ----------------------------------------------------------------------------------
57     ThreadAdmin::~ThreadAdmin()
58     {
59 #if OSL_DEBUG_LEVEL > 1
60         if( m_lst.size() )
61         {
62             fprintf( stderr, "%lu Threads left\n" , static_cast<unsigned long>(m_lst.size()) );
63         }
64 #endif
65     }
66 
67     void ThreadAdmin::add( ORequestThread *p )
68     {
69         MutexGuard aGuard( m_mutex );
70         m_lst.push_back( p );
71     }
72 
73     void ThreadAdmin::remove( ORequestThread * p )
74     {
75         MutexGuard aGuard( m_mutex );
76         ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
77         OSL_ASSERT( ii != m_lst.end() );
78         m_lst.erase( ii );
79     }
80 
81     void ThreadAdmin::join()
82     {
83         ORequestThread *pCurrent;
84         do
85         {
86             pCurrent = 0;
87             {
88                 MutexGuard aGuard( m_mutex );
89                 if( ! m_lst.empty() )
90                 {
91                     pCurrent = m_lst.front();
92                     pCurrent->setDeleteSelf( sal_False );
93                 }
94             }
95             if ( pCurrent )
96             {
97                 pCurrent->join();
98                 delete pCurrent;
99             }
100         } while( pCurrent );
101     }
102 
103     struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin >
104     {
105         ThreadAdminHolder operator () () {
106             ThreadAdminHolder aRet(new ThreadAdmin());
107             return aRet;
108         }
109     };
110 
111     ThreadAdminHolder& ThreadAdmin::getInstance()
112     {
113         return theThreadAdmin::get();
114     }
115 
116 // ----------------------------------------------------------------------------------
117     ORequestThread::ORequestThread( JobQueue *pQueue,
118                                     const ByteSequence &aThreadId,
119                                     sal_Bool bAsynchron )
120         : m_thread( 0 )
121         , m_aThreadAdmin( ThreadAdmin::getInstance() )
122         , m_pQueue( pQueue )
123         , m_aThreadId( aThreadId )
124         , m_bAsynchron( bAsynchron )
125         , m_bDeleteSelf( sal_True )
126     {
127         m_aThreadAdmin->add( this );
128     }
129 
130 
131     ORequestThread::~ORequestThread()
132     {
133         if (m_thread != 0)
134         {
135             osl_destroyThread(m_thread);
136         }
137     }
138 
139 
140     void ORequestThread::setTask( JobQueue *pQueue,
141                                   const ByteSequence &aThreadId,
142                                   sal_Bool bAsynchron )
143     {
144         m_pQueue = pQueue;
145         m_aThreadId = aThreadId;
146         m_bAsynchron = bAsynchron;
147     }
148 
149     sal_Bool ORequestThread::create()
150     {
151         OSL_ASSERT(m_thread == 0);  // only one running thread per instance
152 
153         m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this);
154         if ( m_thread )
155         {
156             osl_resumeThread( m_thread );
157         }
158 
159         return m_thread != 0;
160     }
161 
162     void ORequestThread::join()
163     {
164         osl_joinWithThread( m_thread );
165     }
166 
167     void ORequestThread::onTerminated()
168     {
169         m_aThreadAdmin->remove( this );
170         if( m_bDeleteSelf )
171         {
172             delete this;
173         }
174     }
175 
176     void ORequestThread::run()
177     {
178         ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance();
179 
180         while ( m_pQueue )
181         {
182             if( ! m_bAsynchron )
183             {
184                 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
185                 {
186                     OSL_ASSERT( false );
187                 }
188             }
189 
190             while( ! m_pQueue->isEmpty() )
191             {
192                 // Note : Oneways should not get a disposable disposeid,
193                 //        It does not make sense to dispose a call in this state.
194                 //        That's way we put it an disposeid, that can't be used otherwise.
195                 m_pQueue->enter(
196                     sal::static_int_cast< sal_Int64 >(
197                         reinterpret_cast< sal_IntPtr >(this)),
198                     sal_True );
199 
200                 if( m_pQueue->isEmpty() )
201                 {
202                     theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
203                     // Note : revokeQueue might have failed because m_pQueue.isEmpty()
204                     //        may be false (race).
205                 }
206             }
207 
208             delete m_pQueue;
209             m_pQueue = 0;
210 
211             if( ! m_bAsynchron )
212             {
213                 uno_releaseIdFromCurrentThread();
214             }
215 
216             theThreadPool->waitInPool( this );
217         }
218     }
219 }
220