• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/utility/source/process_thread_impl.h"
12 
13 #include "webrtc/base/checks.h"
14 #include "webrtc/modules/include/module.h"
15 #include "webrtc/system_wrappers/include/logging.h"
16 #include "webrtc/system_wrappers/include/tick_util.h"
17 
18 namespace webrtc {
19 namespace {
20 
21 // We use this constant internally to signal that a module has requested
22 // a callback right away.  When this is set, no call to TimeUntilNextProcess
23 // should be made, but Process() should be called directly.
24 const int64_t kCallProcessImmediately = -1;
25 
GetNextCallbackTime(Module * module,int64_t time_now)26 int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
27   int64_t interval = module->TimeUntilNextProcess();
28   if (interval < 0) {
29     // Falling behind, we should call the callback now.
30     return time_now;
31   }
32   return time_now + interval;
33 }
34 }
35 
~ProcessThread()36 ProcessThread::~ProcessThread() {}
37 
38 // static
Create(const char * thread_name)39 rtc::scoped_ptr<ProcessThread> ProcessThread::Create(
40     const char* thread_name) {
41   return rtc::scoped_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
42 }
43 
ProcessThreadImpl(const char * thread_name)44 ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
45     : wake_up_(EventWrapper::Create()),
46       stop_(false),
47       thread_name_(thread_name) {}
48 
~ProcessThreadImpl()49 ProcessThreadImpl::~ProcessThreadImpl() {
50   RTC_DCHECK(thread_checker_.CalledOnValidThread());
51   RTC_DCHECK(!thread_.get());
52   RTC_DCHECK(!stop_);
53 
54   while (!queue_.empty()) {
55     delete queue_.front();
56     queue_.pop();
57   }
58 }
59 
Start()60 void ProcessThreadImpl::Start() {
61   RTC_DCHECK(thread_checker_.CalledOnValidThread());
62   RTC_DCHECK(!thread_.get());
63   if (thread_.get())
64     return;
65 
66   RTC_DCHECK(!stop_);
67 
68   {
69     // TODO(tommi): Since DeRegisterModule is currently being called from
70     // different threads in some cases (ChannelOwner), we need to lock access to
71     // the modules_ collection even on the controller thread.
72     // Once we've cleaned up those places, we can remove this lock.
73     rtc::CritScope lock(&lock_);
74     for (ModuleCallback& m : modules_)
75       m.module->ProcessThreadAttached(this);
76   }
77 
78   thread_.reset(
79       new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
80   thread_->Start();
81 }
82 
Stop()83 void ProcessThreadImpl::Stop() {
84   RTC_DCHECK(thread_checker_.CalledOnValidThread());
85   if(!thread_.get())
86     return;
87 
88   {
89     rtc::CritScope lock(&lock_);
90     stop_ = true;
91   }
92 
93   wake_up_->Set();
94 
95   thread_->Stop();
96   stop_ = false;
97 
98   // TODO(tommi): Since DeRegisterModule is currently being called from
99   // different threads in some cases (ChannelOwner), we need to lock access to
100   // the modules_ collection even on the controller thread.
101   // Since DeRegisterModule also checks thread_, we also need to hold the
102   // lock for the .reset() operation.
103   // Once we've cleaned up those places, we can remove this lock.
104   rtc::CritScope lock(&lock_);
105   thread_.reset();
106   for (ModuleCallback& m : modules_)
107     m.module->ProcessThreadAttached(nullptr);
108 }
109 
WakeUp(Module * module)110 void ProcessThreadImpl::WakeUp(Module* module) {
111   // Allowed to be called on any thread.
112   {
113     rtc::CritScope lock(&lock_);
114     for (ModuleCallback& m : modules_) {
115       if (m.module == module)
116         m.next_callback = kCallProcessImmediately;
117     }
118   }
119   wake_up_->Set();
120 }
121 
PostTask(rtc::scoped_ptr<ProcessTask> task)122 void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
123   // Allowed to be called on any thread.
124   {
125     rtc::CritScope lock(&lock_);
126     queue_.push(task.release());
127   }
128   wake_up_->Set();
129 }
130 
RegisterModule(Module * module)131 void ProcessThreadImpl::RegisterModule(Module* module) {
132   RTC_DCHECK(thread_checker_.CalledOnValidThread());
133   RTC_DCHECK(module);
134 
135 #if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
136   {
137     // Catch programmer error.
138     rtc::CritScope lock(&lock_);
139     for (const ModuleCallback& mc : modules_)
140       RTC_DCHECK(mc.module != module);
141   }
142 #endif
143 
144   // Now that we know the module isn't in the list, we'll call out to notify
145   // the module that it's attached to the worker thread.  We don't hold
146   // the lock while we make this call.
147   if (thread_.get())
148     module->ProcessThreadAttached(this);
149 
150   {
151     rtc::CritScope lock(&lock_);
152     modules_.push_back(ModuleCallback(module));
153   }
154 
155   // Wake the thread calling ProcessThreadImpl::Process() to update the
156   // waiting time. The waiting time for the just registered module may be
157   // shorter than all other registered modules.
158   wake_up_->Set();
159 }
160 
DeRegisterModule(Module * module)161 void ProcessThreadImpl::DeRegisterModule(Module* module) {
162   // Allowed to be called on any thread.
163   // TODO(tommi): Disallow this ^^^
164   RTC_DCHECK(module);
165 
166   {
167     rtc::CritScope lock(&lock_);
168     modules_.remove_if([&module](const ModuleCallback& m) {
169         return m.module == module;
170       });
171 
172     // TODO(tommi): we currently need to hold the lock while calling out to
173     // ProcessThreadAttached.  This is to make sure that the thread hasn't been
174     // destroyed while we attach the module.  Once we can make sure
175     // DeRegisterModule isn't being called on arbitrary threads, we can move the
176     // |if (thread_.get())| check and ProcessThreadAttached() call outside the
177     // lock scope.
178 
179     // Notify the module that it's been detached.
180     if (thread_.get())
181       module->ProcessThreadAttached(nullptr);
182   }
183 }
184 
185 // static
Run(void * obj)186 bool ProcessThreadImpl::Run(void* obj) {
187   return static_cast<ProcessThreadImpl*>(obj)->Process();
188 }
189 
Process()190 bool ProcessThreadImpl::Process() {
191   int64_t now = TickTime::MillisecondTimestamp();
192   int64_t next_checkpoint = now + (1000 * 60);
193 
194   {
195     rtc::CritScope lock(&lock_);
196     if (stop_)
197       return false;
198     for (ModuleCallback& m : modules_) {
199       // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
200       // takes and dcheck if it takes too long (e.g. >=10ms).  Ideally this
201       // operation should not require taking a lock, so querying all modules
202       // should run in a matter of nanoseconds.
203       if (m.next_callback == 0)
204         m.next_callback = GetNextCallbackTime(m.module, now);
205 
206       if (m.next_callback <= now ||
207           m.next_callback == kCallProcessImmediately) {
208         m.module->Process();
209         // Use a new 'now' reference to calculate when the next callback
210         // should occur.  We'll continue to use 'now' above for the baseline
211         // of calculating how long we should wait, to reduce variance.
212         int64_t new_now = TickTime::MillisecondTimestamp();
213         m.next_callback = GetNextCallbackTime(m.module, new_now);
214       }
215 
216       if (m.next_callback < next_checkpoint)
217         next_checkpoint = m.next_callback;
218     }
219 
220     while (!queue_.empty()) {
221       ProcessTask* task = queue_.front();
222       queue_.pop();
223       lock_.Leave();
224       task->Run();
225       delete task;
226       lock_.Enter();
227     }
228   }
229 
230   int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
231   if (time_to_wait > 0)
232     wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
233 
234   return true;
235 }
236 }  // namespace webrtc
237