• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2014 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/async_invoker.h"
12 
13 #include "rtc_base/checks.h"
14 #include "rtc_base/logging.h"
15 
16 namespace rtc {
17 
AsyncInvoker()18 AsyncInvoker::AsyncInvoker()
19     : pending_invocations_(0),
20       invocation_complete_(new RefCountedObject<Event>()),
21       destroying_(false) {}
22 
~AsyncInvoker()23 AsyncInvoker::~AsyncInvoker() {
24   destroying_.store(true, std::memory_order_relaxed);
25   // Messages for this need to be cleared *before* our destructor is complete.
26   ThreadManager::Clear(this);
27   // And we need to wait for any invocations that are still in progress on
28   // other threads. Using memory_order_acquire for synchronization with
29   // AsyncClosure destructors.
30   while (pending_invocations_.load(std::memory_order_acquire) > 0) {
31     // If the destructor was called while AsyncInvoke was being called by
32     // another thread, WITHIN an AsyncInvoked functor, it may do another
33     // Thread::Post even after we called ThreadManager::Clear(this). So
34     // we need to keep calling Clear to discard these posts.
35     Thread::Current()->Clear(this);
36     invocation_complete_->Wait(Event::kForever);
37   }
38 }
39 
OnMessage(Message * msg)40 void AsyncInvoker::OnMessage(Message* msg) {
41   // Get the AsyncClosure shared ptr from this message's data.
42   ScopedMessageData<AsyncClosure>* data =
43       static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
44   // Execute the closure and trigger the return message if needed.
45   data->inner_data().Execute();
46   delete data;
47 }
48 
Flush(Thread * thread,uint32_t id)49 void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
50   // If the destructor is waiting for invocations to finish, don't start
51   // running even more tasks.
52   if (destroying_.load(std::memory_order_relaxed))
53     return;
54 
55   // Run this on |thread| to reduce the number of context switches.
56   if (Thread::Current() != thread) {
57     thread->Invoke<void>(RTC_FROM_HERE,
58                          Bind(&AsyncInvoker::Flush, this, thread, id));
59     return;
60   }
61 
62   MessageList removed;
63   thread->Clear(this, id, &removed);
64   for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) {
65     // This message was pending on this thread, so run it now.
66     thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata);
67   }
68 }
69 
Clear()70 void AsyncInvoker::Clear() {
71   ThreadManager::Clear(this);
72 }
73 
DoInvoke(const Location & posted_from,Thread * thread,std::unique_ptr<AsyncClosure> closure,uint32_t id)74 void AsyncInvoker::DoInvoke(const Location& posted_from,
75                             Thread* thread,
76                             std::unique_ptr<AsyncClosure> closure,
77                             uint32_t id) {
78   if (destroying_.load(std::memory_order_relaxed)) {
79     // Note that this may be expected, if the application is AsyncInvoking
80     // tasks that AsyncInvoke other tasks. But otherwise it indicates a race
81     // between a thread destroying the AsyncInvoker and a thread still trying
82     // to use it.
83     RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
84     return;
85   }
86   thread->Post(posted_from, this, id,
87                new ScopedMessageData<AsyncClosure>(std::move(closure)));
88 }
89 
DoInvokeDelayed(const Location & posted_from,Thread * thread,std::unique_ptr<AsyncClosure> closure,uint32_t delay_ms,uint32_t id)90 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
91                                    Thread* thread,
92                                    std::unique_ptr<AsyncClosure> closure,
93                                    uint32_t delay_ms,
94                                    uint32_t id) {
95   if (destroying_.load(std::memory_order_relaxed)) {
96     // See above comment.
97     RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
98     return;
99   }
100   thread->PostDelayed(posted_from, delay_ms, this, id,
101                       new ScopedMessageData<AsyncClosure>(std::move(closure)));
102 }
103 
AsyncClosure(AsyncInvoker * invoker)104 AsyncClosure::AsyncClosure(AsyncInvoker* invoker)
105     : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) {
106   invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed);
107 }
108 
~AsyncClosure()109 AsyncClosure::~AsyncClosure() {
110   // Using memory_order_release for synchronization with the AsyncInvoker
111   // destructor.
112   invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release);
113 
114   // After |pending_invocations_| is decremented, we may need to signal
115   // |invocation_complete_| in case the AsyncInvoker is being destroyed and
116   // waiting for pending tasks to complete.
117   //
118   // It's also possible that the destructor finishes before "Set()" is called,
119   // which is safe because the event is reference counted (and in a thread-safe
120   // way).
121   invocation_complete_->Set();
122 }
123 
124 }  // namespace rtc
125