1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/base/plugin_thread_task_runner.h"
6
7 #include "base/bind.h"
8
9 namespace {
10
CalcTimeDelta(base::TimeTicks when)11 base::TimeDelta CalcTimeDelta(base::TimeTicks when) {
12 return std::max(when - base::TimeTicks::Now(), base::TimeDelta());
13 }
14
15 } // namespace
16
17 namespace remoting {
18
~Delegate()19 PluginThreadTaskRunner::Delegate::~Delegate() {
20 }
21
PluginThreadTaskRunner(Delegate * delegate)22 PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate)
23 : plugin_thread_id_(base::PlatformThread::CurrentId()),
24 event_(false, false),
25 delegate_(delegate),
26 next_sequence_num_(0),
27 quit_received_(false),
28 stopped_(false) {
29 }
30
~PluginThreadTaskRunner()31 PluginThreadTaskRunner::~PluginThreadTaskRunner() {
32 DCHECK(delegate_ == NULL);
33 DCHECK(stopped_);
34 }
35
DetachAndRunShutdownLoop()36 void PluginThreadTaskRunner::DetachAndRunShutdownLoop() {
37 DCHECK(BelongsToCurrentThread());
38
39 // Detach from the plugin thread and redirect all tasks posted after this
40 // point to the shutdown task loop.
41 {
42 base::AutoLock auto_lock(lock_);
43
44 DCHECK(delegate_ != NULL);
45 DCHECK(!stopped_);
46
47 delegate_ = NULL;
48 stopped_ = quit_received_;
49 }
50
51 // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled
52 // timers are cancelled. It is OK to clear |scheduled_timers_| even if
53 // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is
54 // called before NPP_Destroy()).
55 scheduled_timers_.clear();
56
57 // Run all tasks that are due.
58 ProcessIncomingTasks();
59 RunDueTasks(base::TimeTicks::Now());
60
61 while (!stopped_) {
62 if (delayed_queue_.empty()) {
63 event_.Wait();
64 } else {
65 event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time));
66 }
67
68 // Run all tasks that are due.
69 ProcessIncomingTasks();
70 RunDueTasks(base::TimeTicks::Now());
71
72 base::AutoLock auto_lock(lock_);
73 stopped_ = quit_received_;
74 }
75 }
76
Quit()77 void PluginThreadTaskRunner::Quit() {
78 base::AutoLock auto_lock(lock_);
79
80 if (!quit_received_) {
81 quit_received_ = true;
82 event_.Signal();
83 }
84 }
85
PostDelayedTask(const tracked_objects::Location & from_here,const base::Closure & task,base::TimeDelta delay)86 bool PluginThreadTaskRunner::PostDelayedTask(
87 const tracked_objects::Location& from_here,
88 const base::Closure& task,
89 base::TimeDelta delay) {
90
91 // Wrap the task into |base::PendingTask|.
92 base::TimeTicks delayed_run_time;
93 if (delay > base::TimeDelta()) {
94 delayed_run_time = base::TimeTicks::Now() + delay;
95 } else {
96 DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
97 }
98
99 base::PendingTask pending_task(from_here, task, delayed_run_time, false);
100
101 // Push the task to the incoming queue.
102 base::AutoLock locked(lock_);
103
104 // Initialize the sequence number. The sequence number provides FIFO ordering
105 // for tasks with the same |delayed_run_time|.
106 pending_task.sequence_num = next_sequence_num_++;
107
108 // Post an asynchronous call on the plugin thread to process the task.
109 if (incoming_queue_.empty()) {
110 PostRunTasks();
111 }
112
113 incoming_queue_.push(pending_task);
114 pending_task.task.Reset();
115
116 // No tasks should be posted after Quit() has been called.
117 DCHECK(!quit_received_);
118 return true;
119 }
120
PostNonNestableDelayedTask(const tracked_objects::Location & from_here,const base::Closure & task,base::TimeDelta delay)121 bool PluginThreadTaskRunner::PostNonNestableDelayedTask(
122 const tracked_objects::Location& from_here,
123 const base::Closure& task,
124 base::TimeDelta delay) {
125 // All tasks running on this task loop are non-nestable.
126 return PostDelayedTask(from_here, task, delay);
127 }
128
RunsTasksOnCurrentThread() const129 bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const {
130 // In pepper plugins ideally we should use pp::Core::IsMainThread,
131 // but it is problematic because we would need to keep reference to
132 // Core somewhere, e.g. make the delegate ref-counted.
133 return base::PlatformThread::CurrentId() == plugin_thread_id_;
134 }
135
PostRunTasks()136 void PluginThreadTaskRunner::PostRunTasks() {
137 // Post tasks to the plugin thread when it is availabe or spin the shutdown
138 // task loop.
139 if (delegate_ != NULL) {
140 base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this);
141 delegate_->RunOnPluginThread(
142 base::TimeDelta(),
143 &PluginThreadTaskRunner::TaskSpringboard,
144 new base::Closure(closure));
145 } else {
146 event_.Signal();
147 }
148 }
149
PostDelayedRunTasks(base::TimeTicks when)150 void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) {
151 DCHECK(BelongsToCurrentThread());
152
153 // |delegate_| is updated from the plugin thread only, so it is safe to access
154 // it here without taking the lock.
155 if (delegate_ != NULL) {
156 // Schedule RunDelayedTasks() to be called at |when| if it hasn't been
157 // scheduled already.
158 if (scheduled_timers_.insert(when).second) {
159 base::TimeDelta delay = CalcTimeDelta(when);
160 base::Closure closure =
161 base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when);
162 delegate_->RunOnPluginThread(
163 delay,
164 &PluginThreadTaskRunner::TaskSpringboard,
165 new base::Closure(closure));
166 }
167 } else {
168 // Spin the shutdown loop if the task runner has already been detached.
169 // The shutdown loop will pick the tasks to run itself.
170 event_.Signal();
171 }
172 }
173
ProcessIncomingTasks()174 void PluginThreadTaskRunner::ProcessIncomingTasks() {
175 DCHECK(BelongsToCurrentThread());
176
177 // Grab all unsorted tasks accomulated so far.
178 base::TaskQueue work_queue;
179 {
180 base::AutoLock locked(lock_);
181 incoming_queue_.Swap(&work_queue);
182 }
183
184 while (!work_queue.empty()) {
185 base::PendingTask pending_task = work_queue.front();
186 work_queue.pop();
187
188 if (pending_task.delayed_run_time.is_null()) {
189 pending_task.task.Run();
190 } else {
191 delayed_queue_.push(pending_task);
192 }
193 }
194 }
195
RunDelayedTasks(base::TimeTicks when)196 void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) {
197 DCHECK(BelongsToCurrentThread());
198
199 scheduled_timers_.erase(when);
200
201 // |stopped_| is updated by the plugin thread only, so it is safe to access
202 // it here without taking the lock.
203 if (!stopped_) {
204 ProcessIncomingTasks();
205 RunDueTasks(base::TimeTicks::Now());
206 }
207 }
208
RunDueTasks(base::TimeTicks now)209 void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) {
210 DCHECK(BelongsToCurrentThread());
211
212 // Run all due tasks.
213 while (!delayed_queue_.empty() &&
214 delayed_queue_.top().delayed_run_time <= now) {
215 delayed_queue_.top().task.Run();
216 delayed_queue_.pop();
217 }
218
219 // Post a delayed asynchronous call to the plugin thread to process tasks from
220 // the delayed queue.
221 if (!delayed_queue_.empty()) {
222 base::TimeTicks when = delayed_queue_.top().delayed_run_time;
223 if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) {
224 PostDelayedRunTasks(when);
225 }
226 }
227 }
228
RunTasks()229 void PluginThreadTaskRunner::RunTasks() {
230 DCHECK(BelongsToCurrentThread());
231
232 // |stopped_| is updated by the plugin thread only, so it is safe to access
233 // it here without taking the lock.
234 if (!stopped_) {
235 ProcessIncomingTasks();
236 RunDueTasks(base::TimeTicks::Now());
237 }
238 }
239
240 // static
TaskSpringboard(void * data)241 void PluginThreadTaskRunner::TaskSpringboard(void* data) {
242 base::Closure* task = reinterpret_cast<base::Closure*>(data);
243 task->Run();
244 delete task;
245 }
246
247 } // namespace remoting
248