1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/utility/source/process_thread_impl.h"
12
13 #include <string>
14
15 #include "modules/include/module.h"
16 #include "rtc_base/checks.h"
17 #include "rtc_base/logging.h"
18 #include "rtc_base/time_utils.h"
19 #include "rtc_base/trace_event.h"
20
21 namespace webrtc {
22 namespace {
23
24 // We use this constant internally to signal that a module has requested
25 // a callback right away. When this is set, no call to TimeUntilNextProcess
26 // should be made, but Process() should be called directly.
27 const int64_t kCallProcessImmediately = -1;
28
GetNextCallbackTime(Module * module,int64_t time_now)29 int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
30 int64_t interval = module->TimeUntilNextProcess();
31 if (interval < 0) {
32 // Falling behind, we should call the callback now.
33 return time_now;
34 }
35 return time_now + interval;
36 }
37 } // namespace
38
~ProcessThread()39 ProcessThread::~ProcessThread() {}
40
41 // static
Create(const char * thread_name)42 std::unique_ptr<ProcessThread> ProcessThread::Create(const char* thread_name) {
43 return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
44 }
45
ProcessThreadImpl(const char * thread_name)46 ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
47 : stop_(false), thread_name_(thread_name) {}
48
~ProcessThreadImpl()49 ProcessThreadImpl::~ProcessThreadImpl() {
50 RTC_DCHECK(thread_checker_.IsCurrent());
51 RTC_DCHECK(!thread_.get());
52 RTC_DCHECK(!stop_);
53
54 while (!delayed_tasks_.empty()) {
55 delete delayed_tasks_.top().task;
56 delayed_tasks_.pop();
57 }
58
59 while (!queue_.empty()) {
60 delete queue_.front();
61 queue_.pop();
62 }
63 }
64
Delete()65 void ProcessThreadImpl::Delete() {
66 RTC_LOG(LS_WARNING) << "Process thread " << thread_name_
67 << " is destroyed as a TaskQueue.";
68 Stop();
69 delete this;
70 }
71
Start()72 void ProcessThreadImpl::Start() {
73 RTC_DCHECK(thread_checker_.IsCurrent());
74 RTC_DCHECK(!thread_.get());
75 if (thread_.get())
76 return;
77
78 RTC_DCHECK(!stop_);
79
80 for (ModuleCallback& m : modules_)
81 m.module->ProcessThreadAttached(this);
82
83 thread_.reset(
84 new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
85 thread_->Start();
86 }
87
Stop()88 void ProcessThreadImpl::Stop() {
89 RTC_DCHECK(thread_checker_.IsCurrent());
90 if (!thread_.get())
91 return;
92
93 {
94 rtc::CritScope lock(&lock_);
95 stop_ = true;
96 }
97
98 wake_up_.Set();
99
100 thread_->Stop();
101 stop_ = false;
102
103 thread_.reset();
104 for (ModuleCallback& m : modules_)
105 m.module->ProcessThreadAttached(nullptr);
106 }
107
WakeUp(Module * module)108 void ProcessThreadImpl::WakeUp(Module* module) {
109 // Allowed to be called on any thread.
110 {
111 rtc::CritScope lock(&lock_);
112 for (ModuleCallback& m : modules_) {
113 if (m.module == module)
114 m.next_callback = kCallProcessImmediately;
115 }
116 }
117 wake_up_.Set();
118 }
119
PostTask(std::unique_ptr<QueuedTask> task)120 void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
121 // Allowed to be called on any thread.
122 {
123 rtc::CritScope lock(&lock_);
124 queue_.push(task.release());
125 }
126 wake_up_.Set();
127 }
128
PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)129 void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
130 uint32_t milliseconds) {
131 int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
132 bool recalculate_wakeup_time;
133 {
134 rtc::CritScope lock(&lock_);
135 recalculate_wakeup_time =
136 delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
137 delayed_tasks_.emplace(run_at_ms, std::move(task));
138 }
139 if (recalculate_wakeup_time) {
140 wake_up_.Set();
141 }
142 }
143
RegisterModule(Module * module,const rtc::Location & from)144 void ProcessThreadImpl::RegisterModule(Module* module,
145 const rtc::Location& from) {
146 RTC_DCHECK(thread_checker_.IsCurrent());
147 RTC_DCHECK(module) << from.ToString();
148
149 #if RTC_DCHECK_IS_ON
150 {
151 // Catch programmer error.
152 rtc::CritScope lock(&lock_);
153 for (const ModuleCallback& mc : modules_) {
154 RTC_DCHECK(mc.module != module)
155 << "Already registered here: " << mc.location.ToString()
156 << "\n"
157 "Now attempting from here: "
158 << from.ToString();
159 }
160 }
161 #endif
162
163 // Now that we know the module isn't in the list, we'll call out to notify
164 // the module that it's attached to the worker thread. We don't hold
165 // the lock while we make this call.
166 if (thread_.get())
167 module->ProcessThreadAttached(this);
168
169 {
170 rtc::CritScope lock(&lock_);
171 modules_.push_back(ModuleCallback(module, from));
172 }
173
174 // Wake the thread calling ProcessThreadImpl::Process() to update the
175 // waiting time. The waiting time for the just registered module may be
176 // shorter than all other registered modules.
177 wake_up_.Set();
178 }
179
DeRegisterModule(Module * module)180 void ProcessThreadImpl::DeRegisterModule(Module* module) {
181 RTC_DCHECK(thread_checker_.IsCurrent());
182 RTC_DCHECK(module);
183
184 {
185 rtc::CritScope lock(&lock_);
186 modules_.remove_if(
187 [&module](const ModuleCallback& m) { return m.module == module; });
188 }
189
190 // Notify the module that it's been detached.
191 module->ProcessThreadAttached(nullptr);
192 }
193
194 // static
Run(void * obj)195 void ProcessThreadImpl::Run(void* obj) {
196 ProcessThreadImpl* impl = static_cast<ProcessThreadImpl*>(obj);
197 CurrentTaskQueueSetter set_current(impl);
198 while (impl->Process()) {
199 }
200 }
201
Process()202 bool ProcessThreadImpl::Process() {
203 TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
204 int64_t now = rtc::TimeMillis();
205 int64_t next_checkpoint = now + (1000 * 60);
206
207 {
208 rtc::CritScope lock(&lock_);
209 if (stop_)
210 return false;
211 for (ModuleCallback& m : modules_) {
212 // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
213 // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
214 // operation should not require taking a lock, so querying all modules
215 // should run in a matter of nanoseconds.
216 if (m.next_callback == 0)
217 m.next_callback = GetNextCallbackTime(m.module, now);
218
219 if (m.next_callback <= now ||
220 m.next_callback == kCallProcessImmediately) {
221 {
222 TRACE_EVENT2("webrtc", "ModuleProcess", "function",
223 m.location.function_name(), "file",
224 m.location.file_name());
225 m.module->Process();
226 }
227 // Use a new 'now' reference to calculate when the next callback
228 // should occur. We'll continue to use 'now' above for the baseline
229 // of calculating how long we should wait, to reduce variance.
230 int64_t new_now = rtc::TimeMillis();
231 m.next_callback = GetNextCallbackTime(m.module, new_now);
232 }
233
234 if (m.next_callback < next_checkpoint)
235 next_checkpoint = m.next_callback;
236 }
237
238 while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
239 queue_.push(delayed_tasks_.top().task);
240 delayed_tasks_.pop();
241 }
242
243 if (!delayed_tasks_.empty()) {
244 next_checkpoint =
245 std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
246 }
247
248 while (!queue_.empty()) {
249 QueuedTask* task = queue_.front();
250 queue_.pop();
251 lock_.Leave();
252 if (task->Run()) {
253 delete task;
254 }
255 lock_.Enter();
256 }
257 }
258
259 int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
260 if (time_to_wait > 0)
261 wake_up_.Wait(static_cast<int>(time_to_wait));
262
263 return true;
264 }
265 } // namespace webrtc
266