1 /**************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21
22
23
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_cppu.hxx"
26 #include <stdio.h>
27 #include <osl/diagnose.h>
28 #include <uno/threadpool.h>
29
30 #include <rtl/instance.hxx>
31
32 #include "thread.hxx"
33 #include "jobqueue.hxx"
34 #include "threadpool.hxx"
35
36
37 using namespace osl;
38 extern "C" {
39
cppu_requestThreadWorker(void * pVoid)40 void SAL_CALL cppu_requestThreadWorker( void *pVoid )
41 {
42 ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid;
43
44 pThread->run();
45 pThread->onTerminated();
46 }
47
48 }
49
50 namespace cppu_threadpool {
51
52 // ----------------------------------------------------------------------------------
~ThreadAdmin()53 ThreadAdmin::~ThreadAdmin()
54 {
55 #if OSL_DEBUG_LEVEL > 1
56 if( m_lst.size() )
57 {
58 fprintf( stderr, "%lu Threads left\n" , static_cast<unsigned long>(m_lst.size()) );
59 }
60 #endif
61 }
62
add(ORequestThread * p)63 void ThreadAdmin::add( ORequestThread *p )
64 {
65 MutexGuard aGuard( m_mutex );
66 m_lst.push_back( p );
67 }
68
remove(ORequestThread * p)69 void ThreadAdmin::remove( ORequestThread * p )
70 {
71 MutexGuard aGuard( m_mutex );
72 ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
73 OSL_ASSERT( ii != m_lst.end() );
74 m_lst.erase( ii );
75 }
76
join()77 void ThreadAdmin::join()
78 {
79 ORequestThread *pCurrent;
80 do
81 {
82 pCurrent = 0;
83 {
84 MutexGuard aGuard( m_mutex );
85 if( ! m_lst.empty() )
86 {
87 pCurrent = m_lst.front();
88 pCurrent->setDeleteSelf( sal_False );
89 }
90 }
91 if ( pCurrent )
92 {
93 pCurrent->join();
94 delete pCurrent;
95 }
96 } while( pCurrent );
97 }
98
99 struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin >
100 {
operator ()cppu_threadpool::theThreadAdmin101 ThreadAdminHolder operator () () {
102 ThreadAdminHolder aRet(new ThreadAdmin());
103 return aRet;
104 }
105 };
106
getInstance()107 ThreadAdminHolder& ThreadAdmin::getInstance()
108 {
109 return theThreadAdmin::get();
110 }
111
112 // ----------------------------------------------------------------------------------
ORequestThread(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)113 ORequestThread::ORequestThread( JobQueue *pQueue,
114 const ByteSequence &aThreadId,
115 sal_Bool bAsynchron )
116 : m_thread( 0 )
117 , m_aThreadAdmin( ThreadAdmin::getInstance() )
118 , m_pQueue( pQueue )
119 , m_aThreadId( aThreadId )
120 , m_bAsynchron( bAsynchron )
121 , m_bDeleteSelf( sal_True )
122 {
123 m_aThreadAdmin->add( this );
124 }
125
126
~ORequestThread()127 ORequestThread::~ORequestThread()
128 {
129 if (m_thread != 0)
130 {
131 osl_destroyThread(m_thread);
132 }
133 }
134
135
setTask(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)136 void ORequestThread::setTask( JobQueue *pQueue,
137 const ByteSequence &aThreadId,
138 sal_Bool bAsynchron )
139 {
140 m_pQueue = pQueue;
141 m_aThreadId = aThreadId;
142 m_bAsynchron = bAsynchron;
143 }
144
create()145 sal_Bool ORequestThread::create()
146 {
147 OSL_ASSERT(m_thread == 0); // only one running thread per instance
148
149 m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this);
150 if ( m_thread )
151 {
152 osl_resumeThread( m_thread );
153 }
154
155 return m_thread != 0;
156 }
157
join()158 void ORequestThread::join()
159 {
160 osl_joinWithThread( m_thread );
161 }
162
onTerminated()163 void ORequestThread::onTerminated()
164 {
165 m_aThreadAdmin->remove( this );
166 if( m_bDeleteSelf )
167 {
168 delete this;
169 }
170 }
171
run()172 void ORequestThread::run()
173 {
174 ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance();
175
176 while ( m_pQueue )
177 {
178 if( ! m_bAsynchron )
179 {
180 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
181 {
182 OSL_ASSERT( false );
183 }
184 }
185
186 while( ! m_pQueue->isEmpty() )
187 {
188 // Note : Oneways should not get a disposable disposeid,
189 // It does not make sense to dispose a call in this state.
190 // That's way we put it an disposeid, that can't be used otherwise.
191 m_pQueue->enter(
192 sal::static_int_cast< sal_Int64 >(
193 reinterpret_cast< sal_IntPtr >(this)),
194 sal_True );
195
196 if( m_pQueue->isEmpty() )
197 {
198 theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
199 // Note : revokeQueue might have failed because m_pQueue.isEmpty()
200 // may be false (race).
201 }
202 }
203
204 delete m_pQueue;
205 m_pQueue = 0;
206
207 if( ! m_bAsynchron )
208 {
209 uno_releaseIdFromCurrentThread();
210 }
211
212 theThreadPool->waitInPool( this );
213 }
214 }
215 }
216