• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/utility/source/process_thread_impl.h"
12 
13 #include <string>
14 
15 #include "modules/include/module.h"
16 #include "rtc_base/checks.h"
17 #include "rtc_base/logging.h"
18 #include "rtc_base/time_utils.h"
19 #include "rtc_base/trace_event.h"
20 
21 namespace webrtc {
22 namespace {
23 
24 // We use this constant internally to signal that a module has requested
25 // a callback right away.  When this is set, no call to TimeUntilNextProcess
26 // should be made, but Process() should be called directly.
27 const int64_t kCallProcessImmediately = -1;
28 
GetNextCallbackTime(Module * module,int64_t time_now)29 int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
30   int64_t interval = module->TimeUntilNextProcess();
31   if (interval < 0) {
32     // Falling behind, we should call the callback now.
33     return time_now;
34   }
35   return time_now + interval;
36 }
37 }  // namespace
38 
~ProcessThread()39 ProcessThread::~ProcessThread() {}
40 
41 // static
Create(const char * thread_name)42 std::unique_ptr<ProcessThread> ProcessThread::Create(const char* thread_name) {
43   return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
44 }
45 
ProcessThreadImpl(const char * thread_name)46 ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
47     : stop_(false), thread_name_(thread_name) {}
48 
~ProcessThreadImpl()49 ProcessThreadImpl::~ProcessThreadImpl() {
50   RTC_DCHECK(thread_checker_.IsCurrent());
51   RTC_DCHECK(!thread_.get());
52   RTC_DCHECK(!stop_);
53 
54   while (!delayed_tasks_.empty()) {
55     delete delayed_tasks_.top().task;
56     delayed_tasks_.pop();
57   }
58 
59   while (!queue_.empty()) {
60     delete queue_.front();
61     queue_.pop();
62   }
63 }
64 
Delete()65 void ProcessThreadImpl::Delete() {
66   RTC_LOG(LS_WARNING) << "Process thread " << thread_name_
67                       << " is destroyed as a TaskQueue.";
68   Stop();
69   delete this;
70 }
71 
Start()72 void ProcessThreadImpl::Start() {
73   RTC_DCHECK(thread_checker_.IsCurrent());
74   RTC_DCHECK(!thread_.get());
75   if (thread_.get())
76     return;
77 
78   RTC_DCHECK(!stop_);
79 
80   for (ModuleCallback& m : modules_)
81     m.module->ProcessThreadAttached(this);
82 
83   thread_.reset(
84       new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
85   thread_->Start();
86 }
87 
Stop()88 void ProcessThreadImpl::Stop() {
89   RTC_DCHECK(thread_checker_.IsCurrent());
90   if (!thread_.get())
91     return;
92 
93   {
94     rtc::CritScope lock(&lock_);
95     stop_ = true;
96   }
97 
98   wake_up_.Set();
99 
100   thread_->Stop();
101   stop_ = false;
102 
103   thread_.reset();
104   for (ModuleCallback& m : modules_)
105     m.module->ProcessThreadAttached(nullptr);
106 }
107 
WakeUp(Module * module)108 void ProcessThreadImpl::WakeUp(Module* module) {
109   // Allowed to be called on any thread.
110   {
111     rtc::CritScope lock(&lock_);
112     for (ModuleCallback& m : modules_) {
113       if (m.module == module)
114         m.next_callback = kCallProcessImmediately;
115     }
116   }
117   wake_up_.Set();
118 }
119 
PostTask(std::unique_ptr<QueuedTask> task)120 void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
121   // Allowed to be called on any thread.
122   {
123     rtc::CritScope lock(&lock_);
124     queue_.push(task.release());
125   }
126   wake_up_.Set();
127 }
128 
PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)129 void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
130                                         uint32_t milliseconds) {
131   int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
132   bool recalculate_wakeup_time;
133   {
134     rtc::CritScope lock(&lock_);
135     recalculate_wakeup_time =
136         delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
137     delayed_tasks_.emplace(run_at_ms, std::move(task));
138   }
139   if (recalculate_wakeup_time) {
140     wake_up_.Set();
141   }
142 }
143 
RegisterModule(Module * module,const rtc::Location & from)144 void ProcessThreadImpl::RegisterModule(Module* module,
145                                        const rtc::Location& from) {
146   RTC_DCHECK(thread_checker_.IsCurrent());
147   RTC_DCHECK(module) << from.ToString();
148 
149 #if RTC_DCHECK_IS_ON
150   {
151     // Catch programmer error.
152     rtc::CritScope lock(&lock_);
153     for (const ModuleCallback& mc : modules_) {
154       RTC_DCHECK(mc.module != module)
155           << "Already registered here: " << mc.location.ToString()
156           << "\n"
157              "Now attempting from here: "
158           << from.ToString();
159     }
160   }
161 #endif
162 
163   // Now that we know the module isn't in the list, we'll call out to notify
164   // the module that it's attached to the worker thread.  We don't hold
165   // the lock while we make this call.
166   if (thread_.get())
167     module->ProcessThreadAttached(this);
168 
169   {
170     rtc::CritScope lock(&lock_);
171     modules_.push_back(ModuleCallback(module, from));
172   }
173 
174   // Wake the thread calling ProcessThreadImpl::Process() to update the
175   // waiting time. The waiting time for the just registered module may be
176   // shorter than all other registered modules.
177   wake_up_.Set();
178 }
179 
DeRegisterModule(Module * module)180 void ProcessThreadImpl::DeRegisterModule(Module* module) {
181   RTC_DCHECK(thread_checker_.IsCurrent());
182   RTC_DCHECK(module);
183 
184   {
185     rtc::CritScope lock(&lock_);
186     modules_.remove_if(
187         [&module](const ModuleCallback& m) { return m.module == module; });
188   }
189 
190   // Notify the module that it's been detached.
191   module->ProcessThreadAttached(nullptr);
192 }
193 
194 // static
Run(void * obj)195 void ProcessThreadImpl::Run(void* obj) {
196   ProcessThreadImpl* impl = static_cast<ProcessThreadImpl*>(obj);
197   CurrentTaskQueueSetter set_current(impl);
198   while (impl->Process()) {
199   }
200 }
201 
Process()202 bool ProcessThreadImpl::Process() {
203   TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
204   int64_t now = rtc::TimeMillis();
205   int64_t next_checkpoint = now + (1000 * 60);
206 
207   {
208     rtc::CritScope lock(&lock_);
209     if (stop_)
210       return false;
211     for (ModuleCallback& m : modules_) {
212       // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
213       // takes and dcheck if it takes too long (e.g. >=10ms).  Ideally this
214       // operation should not require taking a lock, so querying all modules
215       // should run in a matter of nanoseconds.
216       if (m.next_callback == 0)
217         m.next_callback = GetNextCallbackTime(m.module, now);
218 
219       if (m.next_callback <= now ||
220           m.next_callback == kCallProcessImmediately) {
221         {
222           TRACE_EVENT2("webrtc", "ModuleProcess", "function",
223                        m.location.function_name(), "file",
224                        m.location.file_name());
225           m.module->Process();
226         }
227         // Use a new 'now' reference to calculate when the next callback
228         // should occur.  We'll continue to use 'now' above for the baseline
229         // of calculating how long we should wait, to reduce variance.
230         int64_t new_now = rtc::TimeMillis();
231         m.next_callback = GetNextCallbackTime(m.module, new_now);
232       }
233 
234       if (m.next_callback < next_checkpoint)
235         next_checkpoint = m.next_callback;
236     }
237 
238     while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
239       queue_.push(delayed_tasks_.top().task);
240       delayed_tasks_.pop();
241     }
242 
243     if (!delayed_tasks_.empty()) {
244       next_checkpoint =
245           std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
246     }
247 
248     while (!queue_.empty()) {
249       QueuedTask* task = queue_.front();
250       queue_.pop();
251       lock_.Leave();
252       if (task->Run()) {
253         delete task;
254       }
255       lock_.Enter();
256     }
257   }
258 
259   int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
260   if (time_to_wait > 0)
261     wake_up_.Wait(static_cast<int>(time_to_wait));
262 
263   return true;
264 }
265 }  // namespace webrtc
266