• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "test/time_controller/external_time_controller.h"
11 
12 #include <algorithm>
13 #include <map>
14 #include <memory>
15 #include <utility>
16 
17 #include "api/task_queue/queued_task.h"
18 #include "api/task_queue/task_queue_base.h"
19 #include "api/task_queue/task_queue_factory.h"
20 #include "api/units/time_delta.h"
21 #include "api/units/timestamp.h"
22 #include "modules/include/module.h"
23 #include "modules/utility/include/process_thread.h"
24 #include "rtc_base/checks.h"
25 #include "rtc_base/synchronization/yield_policy.h"
26 #include "test/time_controller/simulated_time_controller.h"
27 
28 namespace webrtc {
29 
30 // Wraps a ProcessThread so that it can reschedule the time controller whenever
31 // an external call changes the ProcessThread's state.  For example, when a new
32 // module is registered, the ProcessThread may need to be called sooner than the
33 // time controller's currently-scheduled deadline.
34 class ExternalTimeController::ProcessThreadWrapper : public ProcessThread {
35  public:
ProcessThreadWrapper(ExternalTimeController * parent,std::unique_ptr<ProcessThread> thread)36   ProcessThreadWrapper(ExternalTimeController* parent,
37                        std::unique_ptr<ProcessThread> thread)
38       : parent_(parent), thread_(std::move(thread)) {}
39 
Start()40   void Start() override {
41     parent_->UpdateTime();
42     thread_->Start();
43     parent_->ScheduleNext();
44   }
45 
Stop()46   void Stop() override {
47     parent_->UpdateTime();
48     thread_->Stop();
49     parent_->ScheduleNext();
50   }
51 
WakeUp(Module * module)52   void WakeUp(Module* module) override {
53     parent_->UpdateTime();
54     thread_->WakeUp(GetWrapper(module));
55     parent_->ScheduleNext();
56   }
57 
PostTask(std::unique_ptr<QueuedTask> task)58   void PostTask(std::unique_ptr<QueuedTask> task) override {
59     parent_->UpdateTime();
60     thread_->PostTask(std::move(task));
61     parent_->ScheduleNext();
62   }
63 
PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)64   void PostDelayedTask(std::unique_ptr<QueuedTask> task,
65                        uint32_t milliseconds) override {
66     parent_->UpdateTime();
67     thread_->PostDelayedTask(std::move(task), milliseconds);
68     parent_->ScheduleNext();
69   }
70 
RegisterModule(Module * module,const rtc::Location & from)71   void RegisterModule(Module* module, const rtc::Location& from) override {
72     parent_->UpdateTime();
73     module_wrappers_.emplace(module, new ModuleWrapper(module, this));
74     thread_->RegisterModule(GetWrapper(module), from);
75     parent_->ScheduleNext();
76   }
77 
DeRegisterModule(Module * module)78   void DeRegisterModule(Module* module) override {
79     parent_->UpdateTime();
80     thread_->DeRegisterModule(GetWrapper(module));
81     parent_->ScheduleNext();
82     module_wrappers_.erase(module);
83   }
84 
85  private:
86   class ModuleWrapper : public Module {
87    public:
ModuleWrapper(Module * module,ProcessThreadWrapper * thread)88     ModuleWrapper(Module* module, ProcessThreadWrapper* thread)
89         : module_(module), thread_(thread) {}
90 
TimeUntilNextProcess()91     int64_t TimeUntilNextProcess() override {
92       return module_->TimeUntilNextProcess();
93     }
94 
Process()95     void Process() override { module_->Process(); }
96 
ProcessThreadAttached(ProcessThread * process_thread)97     void ProcessThreadAttached(ProcessThread* process_thread) override {
98       if (process_thread) {
99         module_->ProcessThreadAttached(thread_);
100       } else {
101         module_->ProcessThreadAttached(nullptr);
102       }
103     }
104 
105    private:
106     Module* module_;
107     ProcessThreadWrapper* thread_;
108   };
109 
Delete()110   void Delete() override {
111     // ProcessThread shouldn't be deleted as a TaskQueue.
112     RTC_NOTREACHED();
113   }
114 
GetWrapper(Module * module)115   ModuleWrapper* GetWrapper(Module* module) {
116     auto it = module_wrappers_.find(module);
117     RTC_DCHECK(it != module_wrappers_.end());
118     return it->second.get();
119   }
120 
121   ExternalTimeController* const parent_;
122   std::unique_ptr<ProcessThread> thread_;
123   std::map<Module*, std::unique_ptr<ModuleWrapper>> module_wrappers_;
124 };
125 
126 // Wraps a TaskQueue so that it can reschedule the time controller whenever
127 // an external call schedules a new task.
128 class ExternalTimeController::TaskQueueWrapper : public TaskQueueBase {
129  public:
TaskQueueWrapper(ExternalTimeController * parent,std::unique_ptr<TaskQueueBase,TaskQueueDeleter> base)130   TaskQueueWrapper(ExternalTimeController* parent,
131                    std::unique_ptr<TaskQueueBase, TaskQueueDeleter> base)
132       : parent_(parent), base_(std::move(base)) {}
133 
PostTask(std::unique_ptr<QueuedTask> task)134   void PostTask(std::unique_ptr<QueuedTask> task) override {
135     parent_->UpdateTime();
136     base_->PostTask(std::make_unique<TaskWrapper>(std::move(task), this));
137     parent_->ScheduleNext();
138   }
139 
PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t ms)140   void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t ms) override {
141     parent_->UpdateTime();
142     base_->PostDelayedTask(std::make_unique<TaskWrapper>(std::move(task), this),
143                            ms);
144     parent_->ScheduleNext();
145   }
146 
Delete()147   void Delete() override { delete this; }
148 
149  private:
150   class TaskWrapper : public QueuedTask {
151    public:
TaskWrapper(std::unique_ptr<QueuedTask> task,TaskQueueWrapper * queue)152     TaskWrapper(std::unique_ptr<QueuedTask> task, TaskQueueWrapper* queue)
153         : task_(std::move(task)), queue_(queue) {}
154 
Run()155     bool Run() override {
156       CurrentTaskQueueSetter current(queue_);
157       if (!task_->Run()) {
158         task_.release();
159       }
160       // The wrapper should always be deleted, even if it releases the inner
161       // task, in order to avoid leaking wrappers.
162       return true;
163     }
164 
165    private:
166     std::unique_ptr<QueuedTask> task_;
167     TaskQueueWrapper* queue_;
168   };
169 
170   ExternalTimeController* const parent_;
171   std::unique_ptr<TaskQueueBase, TaskQueueDeleter> base_;
172 };
173 
ExternalTimeController(ControlledAlarmClock * alarm)174 ExternalTimeController::ExternalTimeController(ControlledAlarmClock* alarm)
175     : alarm_(alarm),
176       impl_(alarm_->GetClock()->CurrentTime()),
177       yield_policy_(&impl_) {
178   global_clock_.SetTime(alarm_->GetClock()->CurrentTime());
179   alarm_->SetCallback([this] { Run(); });
180 }
181 
GetClock()182 Clock* ExternalTimeController::GetClock() {
183   return alarm_->GetClock();
184 }
185 
GetTaskQueueFactory()186 TaskQueueFactory* ExternalTimeController::GetTaskQueueFactory() {
187   return this;
188 }
189 
CreateProcessThread(const char * thread_name)190 std::unique_ptr<ProcessThread> ExternalTimeController::CreateProcessThread(
191     const char* thread_name) {
192   return std::make_unique<ProcessThreadWrapper>(
193       this, impl_.CreateProcessThread(thread_name));
194 }
195 
AdvanceTime(TimeDelta duration)196 void ExternalTimeController::AdvanceTime(TimeDelta duration) {
197   alarm_->Sleep(duration);
198 }
199 
CreateThread(const std::string & name,std::unique_ptr<rtc::SocketServer> socket_server)200 std::unique_ptr<rtc::Thread> ExternalTimeController::CreateThread(
201     const std::string& name,
202     std::unique_ptr<rtc::SocketServer> socket_server) {
203   RTC_NOTREACHED();
204   return nullptr;
205 }
206 
GetMainThread()207 rtc::Thread* ExternalTimeController::GetMainThread() {
208   RTC_NOTREACHED();
209   return nullptr;
210 }
211 
212 std::unique_ptr<TaskQueueBase, TaskQueueDeleter>
CreateTaskQueue(absl::string_view name,TaskQueueFactory::Priority priority) const213 ExternalTimeController::CreateTaskQueue(
214     absl::string_view name,
215     TaskQueueFactory::Priority priority) const {
216   return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
217       new TaskQueueWrapper(const_cast<ExternalTimeController*>(this),
218                            impl_.CreateTaskQueue(name, priority)));
219 }
220 
Run()221 void ExternalTimeController::Run() {
222   rtc::ScopedYieldPolicy yield_policy(&impl_);
223   UpdateTime();
224   impl_.RunReadyRunners();
225   ScheduleNext();
226 }
227 
UpdateTime()228 void ExternalTimeController::UpdateTime() {
229   Timestamp now = alarm_->GetClock()->CurrentTime();
230   impl_.AdvanceTime(now);
231   global_clock_.SetTime(now);
232 }
233 
ScheduleNext()234 void ExternalTimeController::ScheduleNext() {
235   RTC_DCHECK_EQ(impl_.CurrentTime(), alarm_->GetClock()->CurrentTime());
236   TimeDelta delay =
237       std::max(impl_.NextRunTime() - impl_.CurrentTime(), TimeDelta::Zero());
238   if (delay.IsFinite()) {
239     alarm_->ScheduleAlarmAt(alarm_->GetClock()->CurrentTime() + delay);
240   }
241 }
242 
243 }  // namespace webrtc
244