1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_comphelper.hxx"
26 #include <comphelper/asyncnotification.hxx>
27 #include <osl/diagnose.h>
28 #include <osl/mutex.hxx>
29 #include <osl/conditn.hxx>
30 #include <comphelper/guarding.hxx>
31 
32 #include <deque>
33 #include <set>
34 #include <functional>
35 #include <algorithm>
36 
37 //........................................................................
38 namespace comphelper
39 {
40 //........................................................................
41 
42     //====================================================================
43     //= AnyEvent
44     //====================================================================
45     //--------------------------------------------------------------------
AnyEvent()46     AnyEvent::AnyEvent()
47         :m_refCount( 0 )
48     {
49     }
50 
51     //--------------------------------------------------------------------
~AnyEvent()52     AnyEvent::~AnyEvent()
53     {
54     }
55 
56     //--------------------------------------------------------------------
acquire()57     oslInterlockedCount SAL_CALL AnyEvent::acquire()
58     {
59         return osl_incrementInterlockedCount( &m_refCount );
60     }
61 
62     //--------------------------------------------------------------------
release()63     oslInterlockedCount SAL_CALL AnyEvent::release()
64     {
65         if ( 0 == osl_decrementInterlockedCount( &m_refCount ) )
66         {
67             delete this;
68             return 0;
69         }
70         return m_refCount;
71     }
72 
73     //====================================================================
74     //= ProcessableEvent
75     //====================================================================
76     struct ProcessableEvent
77     {
78         AnyEventRef                         aEvent;
79         ::rtl::Reference< IEventProcessor > xProcessor;
80 
ProcessableEventcomphelper::ProcessableEvent81         ProcessableEvent( const AnyEventRef& _rEvent, const ::rtl::Reference< IEventProcessor >& _xProcessor )
82             :aEvent( _rEvent )
83             ,xProcessor( _xProcessor )
84         {
85         }
86 
ProcessableEventcomphelper::ProcessableEvent87         ProcessableEvent( const ProcessableEvent& _rRHS )
88             :aEvent( _rRHS.aEvent )
89             ,xProcessor( _rRHS.xProcessor )
90         {
91         }
92 
operator =comphelper::ProcessableEvent93         ProcessableEvent& operator=( const ProcessableEvent& _rRHS )
94         {
95             aEvent = _rRHS.aEvent;
96             xProcessor = _rRHS.xProcessor;
97             return *this;
98         }
99     };
100 
101     //====================================================================
102     typedef ::std::deque< ProcessableEvent >    EventQueue;
103 
104     //====================================================================
105     struct EqualProcessor : public ::std::unary_function< ProcessableEvent, bool >
106     {
107         const ::rtl::Reference< IEventProcessor >&  rProcessor;
EqualProcessorcomphelper::EqualProcessor108         EqualProcessor( const ::rtl::Reference< IEventProcessor >& _rProcessor ) :rProcessor( _rProcessor ) { }
109 
operator ()comphelper::EqualProcessor110         bool operator()( const ProcessableEvent& _rEvent )
111         {
112             return _rEvent.xProcessor.get() == rProcessor.get();
113         }
114     };
115 
116     //====================================================================
117     //= EventNotifierImpl
118     //====================================================================
119     struct EventNotifierImpl
120     {
121         ::osl::Mutex        aMutex;
122         oslInterlockedCount m_refCount;
123         ::osl::Condition    aPendingActions;
124         EventQueue          aEvents;
125         ::std::set< ::rtl::Reference< IEventProcessor > >
126                             m_aDeadProcessors;
127 
EventNotifierImplcomphelper::EventNotifierImpl128         EventNotifierImpl()
129             :m_refCount( 0 )
130         {
131         }
132 
133     private:
134         EventNotifierImpl( const EventNotifierImpl& );              // never implemented
135         EventNotifierImpl& operator=( const EventNotifierImpl& );   // never implemented
136     };
137 
138     //====================================================================
139     //= AsyncEventNotifier
140     //====================================================================
141     //--------------------------------------------------------------------
AsyncEventNotifier()142     AsyncEventNotifier::AsyncEventNotifier()
143         :m_pImpl( new EventNotifierImpl )
144     {
145     }
146 
147     //--------------------------------------------------------------------
~AsyncEventNotifier()148     AsyncEventNotifier::~AsyncEventNotifier()
149     {
150     }
151 
152     //--------------------------------------------------------------------
removeEventsForProcessor(const::rtl::Reference<IEventProcessor> & _xProcessor)153     void AsyncEventNotifier::removeEventsForProcessor( const ::rtl::Reference< IEventProcessor >& _xProcessor )
154     {
155         ::osl::MutexGuard aGuard( m_pImpl->aMutex );
156 
157         // remove all events for this processor
158         ::std::remove_if( m_pImpl->aEvents.begin(), m_pImpl->aEvents.end(), EqualProcessor( _xProcessor ) );
159 
160         // and just in case that an event for exactly this processor has just been
161         // popped from the queue, but not yet processed: remember it:
162         m_pImpl->m_aDeadProcessors.insert( _xProcessor );
163     }
164 
165     //--------------------------------------------------------------------
terminate()166     void SAL_CALL AsyncEventNotifier::terminate()
167     {
168         ::osl::MutexGuard aGuard( m_pImpl->aMutex );
169 
170         // remember the termination request
171         AsyncEventNotifier_TBASE::terminate();
172 
173         // awake the thread
174         m_pImpl->aPendingActions.set();
175     }
176 
177     //--------------------------------------------------------------------
addEvent(const AnyEventRef & _rEvent,const::rtl::Reference<IEventProcessor> & _xProcessor)178     void AsyncEventNotifier::addEvent( const AnyEventRef& _rEvent, const ::rtl::Reference< IEventProcessor >& _xProcessor )
179     {
180         ::osl::MutexGuard aGuard( m_pImpl->aMutex );
181 
182         OSL_TRACE( "AsyncEventNotifier(%p): adding %p\n", this, _rEvent.get() );
183         // remember this event
184         m_pImpl->aEvents.push_back( ProcessableEvent( _rEvent, _xProcessor ) );
185 
186         // awake the thread
187         m_pImpl->aPendingActions.set();
188     }
189 
190     //--------------------------------------------------------------------
run()191     void AsyncEventNotifier::run()
192     {
193         acquire();
194 
195         // keep us alive, in case we're terminated in the mid of the following
196         ::rtl::Reference< AsyncEventNotifier > xKeepAlive( this );
197 
198         do
199         {
200             AnyEventRef aNextEvent;
201             ::rtl::Reference< IEventProcessor > xNextProcessor;
202 
203             ::osl::ClearableMutexGuard aGuard( m_pImpl->aMutex );
204             while ( m_pImpl->aEvents.size() > 0 )
205             {
206                 ProcessableEvent aEvent( m_pImpl->aEvents.front() );
207                 aNextEvent = aEvent.aEvent;
208                 xNextProcessor = aEvent.xProcessor;
209                 m_pImpl->aEvents.pop_front();
210 
211                 OSL_TRACE( "AsyncEventNotifier(%p): popping %p\n", this, aNextEvent.get() );
212 
213                 if ( !aNextEvent.get() )
214                     continue;
215 
216                 // process the event, but only if it's processor did not die inbetween
217                 ::std::set< ::rtl::Reference< IEventProcessor > >::iterator deadPos = m_pImpl->m_aDeadProcessors.find( xNextProcessor );
218                 if ( deadPos != m_pImpl->m_aDeadProcessors.end() )
219                 {
220                     m_pImpl->m_aDeadProcessors.erase( xNextProcessor );
221                     xNextProcessor.clear();
222                     OSL_TRACE( "AsyncEventNotifier(%p): removing %p\n", this, aNextEvent.get() );
223                 }
224 
225                 // if there was a termination request (->terminate), respect it
226                 if ( !schedule() )
227                     return;
228 
229                 {
230                     ::comphelper::MutexRelease aReleaseOnce( m_pImpl->aMutex );
231                     if ( xNextProcessor.get() )
232                         xNextProcessor->processEvent( *aNextEvent.get() );
233                 }
234             }
235 
236             // if there was a termination request (->terminate), respect it
237             if ( !schedule() )
238                 return;
239 
240             // wait for new events to process
241             aGuard.clear();
242             m_pImpl->aPendingActions.reset();
243             m_pImpl->aPendingActions.wait();
244         }
245         while ( sal_True );
246     }
247 
248     //--------------------------------------------------------------------
onTerminated()249     void SAL_CALL AsyncEventNotifier::onTerminated()
250     {
251         AsyncEventNotifier_TBASE::onTerminated();
252         // when we were started (->run), we acquired ourself. Release this now
253         // that we were finally terminated
254         release();
255     }
256 
257     //--------------------------------------------------------------------
acquire()258 	oslInterlockedCount SAL_CALL AsyncEventNotifier::acquire()
259     {
260         return osl_incrementInterlockedCount( &m_pImpl->m_refCount );
261     }
262 
263     //--------------------------------------------------------------------
release()264 	oslInterlockedCount SAL_CALL AsyncEventNotifier::release()
265     {
266         if ( 0 == osl_decrementInterlockedCount( &m_pImpl->m_refCount ) )
267         {
268             delete this;
269             return 0;
270         }
271         return m_pImpl->m_refCount;
272     }
273 
274 //........................................................................
275 } // namespace comphelper
276 //........................................................................
277 
278