1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 // MARKER(update_precomp.py): autogen include statement, do not remove 25 #include "precompiled_comphelper.hxx" 26 #include <comphelper/asyncnotification.hxx> 27 #include <osl/diagnose.h> 28 #include <osl/mutex.hxx> 29 #include <osl/conditn.hxx> 30 #include <comphelper/guarding.hxx> 31 32 #include <deque> 33 #include <set> 34 #include <functional> 35 #include <algorithm> 36 37 //........................................................................ 38 namespace comphelper 39 { 40 //........................................................................ 41 42 //==================================================================== 43 //= AnyEvent 44 //==================================================================== 45 //-------------------------------------------------------------------- AnyEvent()46 AnyEvent::AnyEvent() 47 :m_refCount( 0 ) 48 { 49 } 50 51 //-------------------------------------------------------------------- ~AnyEvent()52 AnyEvent::~AnyEvent() 53 { 54 } 55 56 //-------------------------------------------------------------------- acquire()57 oslInterlockedCount SAL_CALL AnyEvent::acquire() 58 { 59 return osl_incrementInterlockedCount( &m_refCount ); 60 } 61 62 //-------------------------------------------------------------------- release()63 oslInterlockedCount SAL_CALL AnyEvent::release() 64 { 65 if ( 0 == osl_decrementInterlockedCount( &m_refCount ) ) 66 { 67 delete this; 68 return 0; 69 } 70 return m_refCount; 71 } 72 73 //==================================================================== 74 //= ProcessableEvent 75 //==================================================================== 76 struct ProcessableEvent 77 { 78 AnyEventRef aEvent; 79 ::rtl::Reference< IEventProcessor > xProcessor; 80 ProcessableEventcomphelper::ProcessableEvent81 ProcessableEvent( const AnyEventRef& _rEvent, const ::rtl::Reference< IEventProcessor >& _xProcessor ) 82 :aEvent( _rEvent ) 83 ,xProcessor( _xProcessor ) 84 { 85 } 86 ProcessableEventcomphelper::ProcessableEvent87 ProcessableEvent( const ProcessableEvent& _rRHS ) 88 :aEvent( _rRHS.aEvent ) 89 ,xProcessor( _rRHS.xProcessor ) 90 { 91 } 92 operator =comphelper::ProcessableEvent93 ProcessableEvent& operator=( const ProcessableEvent& _rRHS ) 94 { 95 aEvent = _rRHS.aEvent; 96 xProcessor = _rRHS.xProcessor; 97 return *this; 98 } 99 }; 100 101 //==================================================================== 102 typedef ::std::deque< ProcessableEvent > EventQueue; 103 104 //==================================================================== 105 struct EqualProcessor : public ::std::unary_function< ProcessableEvent, bool > 106 { 107 const ::rtl::Reference< IEventProcessor >& rProcessor; EqualProcessorcomphelper::EqualProcessor108 EqualProcessor( const ::rtl::Reference< IEventProcessor >& _rProcessor ) :rProcessor( _rProcessor ) { } 109 operator ()comphelper::EqualProcessor110 bool operator()( const ProcessableEvent& _rEvent ) 111 { 112 return _rEvent.xProcessor.get() == rProcessor.get(); 113 } 114 }; 115 116 //==================================================================== 117 //= EventNotifierImpl 118 //==================================================================== 119 struct EventNotifierImpl 120 { 121 ::osl::Mutex aMutex; 122 oslInterlockedCount m_refCount; 123 ::osl::Condition aPendingActions; 124 EventQueue aEvents; 125 ::std::set< ::rtl::Reference< IEventProcessor > > 126 m_aDeadProcessors; 127 EventNotifierImplcomphelper::EventNotifierImpl128 EventNotifierImpl() 129 :m_refCount( 0 ) 130 { 131 } 132 133 private: 134 EventNotifierImpl( const EventNotifierImpl& ); // never implemented 135 EventNotifierImpl& operator=( const EventNotifierImpl& ); // never implemented 136 }; 137 138 //==================================================================== 139 //= AsyncEventNotifier 140 //==================================================================== 141 //-------------------------------------------------------------------- AsyncEventNotifier()142 AsyncEventNotifier::AsyncEventNotifier() 143 :m_pImpl( new EventNotifierImpl ) 144 { 145 } 146 147 //-------------------------------------------------------------------- ~AsyncEventNotifier()148 AsyncEventNotifier::~AsyncEventNotifier() 149 { 150 } 151 152 //-------------------------------------------------------------------- removeEventsForProcessor(const::rtl::Reference<IEventProcessor> & _xProcessor)153 void AsyncEventNotifier::removeEventsForProcessor( const ::rtl::Reference< IEventProcessor >& _xProcessor ) 154 { 155 ::osl::MutexGuard aGuard( m_pImpl->aMutex ); 156 157 // remove all events for this processor 158 ::std::remove_if( m_pImpl->aEvents.begin(), m_pImpl->aEvents.end(), EqualProcessor( _xProcessor ) ); 159 160 // and just in case that an event for exactly this processor has just been 161 // popped from the queue, but not yet processed: remember it: 162 m_pImpl->m_aDeadProcessors.insert( _xProcessor ); 163 } 164 165 //-------------------------------------------------------------------- terminate()166 void SAL_CALL AsyncEventNotifier::terminate() 167 { 168 ::osl::MutexGuard aGuard( m_pImpl->aMutex ); 169 170 // remember the termination request 171 AsyncEventNotifier_TBASE::terminate(); 172 173 // awake the thread 174 m_pImpl->aPendingActions.set(); 175 } 176 177 //-------------------------------------------------------------------- addEvent(const AnyEventRef & _rEvent,const::rtl::Reference<IEventProcessor> & _xProcessor)178 void AsyncEventNotifier::addEvent( const AnyEventRef& _rEvent, const ::rtl::Reference< IEventProcessor >& _xProcessor ) 179 { 180 ::osl::MutexGuard aGuard( m_pImpl->aMutex ); 181 182 OSL_TRACE( "AsyncEventNotifier(%p): adding %p\n", this, _rEvent.get() ); 183 // remember this event 184 m_pImpl->aEvents.push_back( ProcessableEvent( _rEvent, _xProcessor ) ); 185 186 // awake the thread 187 m_pImpl->aPendingActions.set(); 188 } 189 190 //-------------------------------------------------------------------- run()191 void AsyncEventNotifier::run() 192 { 193 acquire(); 194 195 // keep us alive, in case we're terminated in the mid of the following 196 ::rtl::Reference< AsyncEventNotifier > xKeepAlive( this ); 197 198 do 199 { 200 AnyEventRef aNextEvent; 201 ::rtl::Reference< IEventProcessor > xNextProcessor; 202 203 ::osl::ClearableMutexGuard aGuard( m_pImpl->aMutex ); 204 while ( m_pImpl->aEvents.size() > 0 ) 205 { 206 ProcessableEvent aEvent( m_pImpl->aEvents.front() ); 207 aNextEvent = aEvent.aEvent; 208 xNextProcessor = aEvent.xProcessor; 209 m_pImpl->aEvents.pop_front(); 210 211 OSL_TRACE( "AsyncEventNotifier(%p): popping %p\n", this, aNextEvent.get() ); 212 213 if ( !aNextEvent.get() ) 214 continue; 215 216 // process the event, but only if it's processor did not die inbetween 217 ::std::set< ::rtl::Reference< IEventProcessor > >::iterator deadPos = m_pImpl->m_aDeadProcessors.find( xNextProcessor ); 218 if ( deadPos != m_pImpl->m_aDeadProcessors.end() ) 219 { 220 m_pImpl->m_aDeadProcessors.erase( xNextProcessor ); 221 xNextProcessor.clear(); 222 OSL_TRACE( "AsyncEventNotifier(%p): removing %p\n", this, aNextEvent.get() ); 223 } 224 225 // if there was a termination request (->terminate), respect it 226 if ( !schedule() ) 227 return; 228 229 { 230 ::comphelper::MutexRelease aReleaseOnce( m_pImpl->aMutex ); 231 if ( xNextProcessor.get() ) 232 xNextProcessor->processEvent( *aNextEvent.get() ); 233 } 234 } 235 236 // if there was a termination request (->terminate), respect it 237 if ( !schedule() ) 238 return; 239 240 // wait for new events to process 241 aGuard.clear(); 242 m_pImpl->aPendingActions.reset(); 243 m_pImpl->aPendingActions.wait(); 244 } 245 while ( sal_True ); 246 } 247 248 //-------------------------------------------------------------------- onTerminated()249 void SAL_CALL AsyncEventNotifier::onTerminated() 250 { 251 AsyncEventNotifier_TBASE::onTerminated(); 252 // when we were started (->run), we aquired ourself. Release this now 253 // that we were finally terminated 254 release(); 255 } 256 257 //-------------------------------------------------------------------- acquire()258 oslInterlockedCount SAL_CALL AsyncEventNotifier::acquire() 259 { 260 return osl_incrementInterlockedCount( &m_pImpl->m_refCount ); 261 } 262 263 //-------------------------------------------------------------------- release()264 oslInterlockedCount SAL_CALL AsyncEventNotifier::release() 265 { 266 if ( 0 == osl_decrementInterlockedCount( &m_pImpl->m_refCount ) ) 267 { 268 delete this; 269 return 0; 270 } 271 return m_pImpl->m_refCount; 272 } 273 274 //........................................................................ 275 } // namespace comphelper 276 //........................................................................ 277 278