1 /*
2 * Copyright 2014 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "rtc_base/async_invoker.h"
12
13 #include "rtc_base/checks.h"
14 #include "rtc_base/logging.h"
15
16 namespace rtc {
17
AsyncInvoker()18 AsyncInvoker::AsyncInvoker()
19 : pending_invocations_(0),
20 invocation_complete_(new RefCountedObject<Event>()),
21 destroying_(false) {}
22
~AsyncInvoker()23 AsyncInvoker::~AsyncInvoker() {
24 destroying_.store(true, std::memory_order_relaxed);
25 // Messages for this need to be cleared *before* our destructor is complete.
26 ThreadManager::Clear(this);
27 // And we need to wait for any invocations that are still in progress on
28 // other threads. Using memory_order_acquire for synchronization with
29 // AsyncClosure destructors.
30 while (pending_invocations_.load(std::memory_order_acquire) > 0) {
31 // If the destructor was called while AsyncInvoke was being called by
32 // another thread, WITHIN an AsyncInvoked functor, it may do another
33 // Thread::Post even after we called ThreadManager::Clear(this). So
34 // we need to keep calling Clear to discard these posts.
35 Thread::Current()->Clear(this);
36 invocation_complete_->Wait(Event::kForever);
37 }
38 }
39
OnMessage(Message * msg)40 void AsyncInvoker::OnMessage(Message* msg) {
41 // Get the AsyncClosure shared ptr from this message's data.
42 ScopedMessageData<AsyncClosure>* data =
43 static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
44 // Execute the closure and trigger the return message if needed.
45 data->inner_data().Execute();
46 delete data;
47 }
48
Flush(Thread * thread,uint32_t id)49 void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
50 // If the destructor is waiting for invocations to finish, don't start
51 // running even more tasks.
52 if (destroying_.load(std::memory_order_relaxed))
53 return;
54
55 // Run this on |thread| to reduce the number of context switches.
56 if (Thread::Current() != thread) {
57 thread->Invoke<void>(RTC_FROM_HERE,
58 Bind(&AsyncInvoker::Flush, this, thread, id));
59 return;
60 }
61
62 MessageList removed;
63 thread->Clear(this, id, &removed);
64 for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) {
65 // This message was pending on this thread, so run it now.
66 thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata);
67 }
68 }
69
Clear()70 void AsyncInvoker::Clear() {
71 ThreadManager::Clear(this);
72 }
73
DoInvoke(const Location & posted_from,Thread * thread,std::unique_ptr<AsyncClosure> closure,uint32_t id)74 void AsyncInvoker::DoInvoke(const Location& posted_from,
75 Thread* thread,
76 std::unique_ptr<AsyncClosure> closure,
77 uint32_t id) {
78 if (destroying_.load(std::memory_order_relaxed)) {
79 // Note that this may be expected, if the application is AsyncInvoking
80 // tasks that AsyncInvoke other tasks. But otherwise it indicates a race
81 // between a thread destroying the AsyncInvoker and a thread still trying
82 // to use it.
83 RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
84 return;
85 }
86 thread->Post(posted_from, this, id,
87 new ScopedMessageData<AsyncClosure>(std::move(closure)));
88 }
89
DoInvokeDelayed(const Location & posted_from,Thread * thread,std::unique_ptr<AsyncClosure> closure,uint32_t delay_ms,uint32_t id)90 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
91 Thread* thread,
92 std::unique_ptr<AsyncClosure> closure,
93 uint32_t delay_ms,
94 uint32_t id) {
95 if (destroying_.load(std::memory_order_relaxed)) {
96 // See above comment.
97 RTC_LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
98 return;
99 }
100 thread->PostDelayed(posted_from, delay_ms, this, id,
101 new ScopedMessageData<AsyncClosure>(std::move(closure)));
102 }
103
AsyncClosure(AsyncInvoker * invoker)104 AsyncClosure::AsyncClosure(AsyncInvoker* invoker)
105 : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) {
106 invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed);
107 }
108
~AsyncClosure()109 AsyncClosure::~AsyncClosure() {
110 // Using memory_order_release for synchronization with the AsyncInvoker
111 // destructor.
112 invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release);
113
114 // After |pending_invocations_| is decremented, we may need to signal
115 // |invocation_complete_| in case the AsyncInvoker is being destroyed and
116 // waiting for pending tasks to complete.
117 //
118 // It's also possible that the destructor finishes before "Set()" is called,
119 // which is safe because the event is reference counted (and in a thread-safe
120 // way).
121 invocation_complete_->Set();
122 }
123
124 } // namespace rtc
125