• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/base/plugin_thread_task_runner.h"
6 
7 #include "base/bind.h"
8 
9 namespace {
10 
CalcTimeDelta(base::TimeTicks when)11 base::TimeDelta CalcTimeDelta(base::TimeTicks when) {
12   return std::max(when - base::TimeTicks::Now(), base::TimeDelta());
13 }
14 
15 }  // namespace
16 
17 namespace remoting {
18 
~Delegate()19 PluginThreadTaskRunner::Delegate::~Delegate() {
20 }
21 
PluginThreadTaskRunner(Delegate * delegate)22 PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate)
23     : plugin_thread_id_(base::PlatformThread::CurrentId()),
24       event_(false, false),
25       delegate_(delegate),
26       next_sequence_num_(0),
27       quit_received_(false),
28       stopped_(false) {
29 }
30 
~PluginThreadTaskRunner()31 PluginThreadTaskRunner::~PluginThreadTaskRunner() {
32   DCHECK(delegate_ == NULL);
33   DCHECK(stopped_);
34 }
35 
DetachAndRunShutdownLoop()36 void PluginThreadTaskRunner::DetachAndRunShutdownLoop() {
37   DCHECK(BelongsToCurrentThread());
38 
39   // Detach from the plugin thread and redirect all tasks posted after this
40   // point to the shutdown task loop.
41   {
42     base::AutoLock auto_lock(lock_);
43 
44     DCHECK(delegate_ != NULL);
45     DCHECK(!stopped_);
46 
47     delegate_ = NULL;
48     stopped_ = quit_received_;
49   }
50 
51   // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled
52   // timers are cancelled. It is OK to clear |scheduled_timers_| even if
53   // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is
54   // called before NPP_Destroy()).
55   scheduled_timers_.clear();
56 
57   // Run all tasks that are due.
58   ProcessIncomingTasks();
59   RunDueTasks(base::TimeTicks::Now());
60 
61   while (!stopped_) {
62     if (delayed_queue_.empty()) {
63       event_.Wait();
64     } else {
65       event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time));
66     }
67 
68     // Run all tasks that are due.
69     ProcessIncomingTasks();
70     RunDueTasks(base::TimeTicks::Now());
71 
72     base::AutoLock auto_lock(lock_);
73     stopped_ = quit_received_;
74   }
75 }
76 
Quit()77 void PluginThreadTaskRunner::Quit() {
78   base::AutoLock auto_lock(lock_);
79 
80   if (!quit_received_) {
81     quit_received_ = true;
82     event_.Signal();
83   }
84 }
85 
PostDelayedTask(const tracked_objects::Location & from_here,const base::Closure & task,base::TimeDelta delay)86 bool PluginThreadTaskRunner::PostDelayedTask(
87     const tracked_objects::Location& from_here,
88     const base::Closure& task,
89     base::TimeDelta delay) {
90 
91   // Wrap the task into |base::PendingTask|.
92   base::TimeTicks delayed_run_time;
93   if (delay > base::TimeDelta()) {
94     delayed_run_time = base::TimeTicks::Now() + delay;
95   } else {
96     DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
97   }
98 
99   base::PendingTask pending_task(from_here, task, delayed_run_time, false);
100 
101   // Push the task to the incoming queue.
102   base::AutoLock locked(lock_);
103 
104   // Initialize the sequence number. The sequence number provides FIFO ordering
105   // for tasks with the same |delayed_run_time|.
106   pending_task.sequence_num = next_sequence_num_++;
107 
108   // Post an asynchronous call on the plugin thread to process the task.
109   if (incoming_queue_.empty()) {
110     PostRunTasks();
111   }
112 
113   incoming_queue_.push(pending_task);
114   pending_task.task.Reset();
115 
116   // No tasks should be posted after Quit() has been called.
117   DCHECK(!quit_received_);
118   return true;
119 }
120 
PostNonNestableDelayedTask(const tracked_objects::Location & from_here,const base::Closure & task,base::TimeDelta delay)121 bool PluginThreadTaskRunner::PostNonNestableDelayedTask(
122     const tracked_objects::Location& from_here,
123     const base::Closure& task,
124     base::TimeDelta delay) {
125   // All tasks running on this task loop are non-nestable.
126   return PostDelayedTask(from_here, task, delay);
127 }
128 
RunsTasksOnCurrentThread() const129 bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const {
130   // In pepper plugins ideally we should use pp::Core::IsMainThread,
131   // but it is problematic because we would need to keep reference to
132   // Core somewhere, e.g. make the delegate ref-counted.
133   return base::PlatformThread::CurrentId() == plugin_thread_id_;
134 }
135 
PostRunTasks()136 void PluginThreadTaskRunner::PostRunTasks() {
137   // Post tasks to the plugin thread when it is availabe or spin the shutdown
138   // task loop.
139   if (delegate_ != NULL) {
140     base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this);
141     delegate_->RunOnPluginThread(
142         base::TimeDelta(),
143         &PluginThreadTaskRunner::TaskSpringboard,
144         new base::Closure(closure));
145   } else {
146     event_.Signal();
147   }
148 }
149 
PostDelayedRunTasks(base::TimeTicks when)150 void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) {
151   DCHECK(BelongsToCurrentThread());
152 
153   // |delegate_| is updated from the plugin thread only, so it is safe to access
154   // it here without taking the lock.
155   if (delegate_ != NULL) {
156     // Schedule RunDelayedTasks() to be called at |when| if it hasn't been
157     // scheduled already.
158     if (scheduled_timers_.insert(when).second) {
159       base::TimeDelta delay = CalcTimeDelta(when);
160       base::Closure closure =
161           base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when);
162       delegate_->RunOnPluginThread(
163           delay,
164           &PluginThreadTaskRunner::TaskSpringboard,
165           new base::Closure(closure));
166     }
167   } else {
168     // Spin the shutdown loop if the task runner has already been detached.
169     // The shutdown loop will pick the tasks to run itself.
170     event_.Signal();
171   }
172 }
173 
ProcessIncomingTasks()174 void PluginThreadTaskRunner::ProcessIncomingTasks() {
175   DCHECK(BelongsToCurrentThread());
176 
177   // Grab all unsorted tasks accomulated so far.
178   base::TaskQueue work_queue;
179   {
180     base::AutoLock locked(lock_);
181     incoming_queue_.Swap(&work_queue);
182   }
183 
184   while (!work_queue.empty()) {
185     base::PendingTask pending_task = work_queue.front();
186     work_queue.pop();
187 
188     if (pending_task.delayed_run_time.is_null()) {
189       pending_task.task.Run();
190     } else {
191       delayed_queue_.push(pending_task);
192     }
193   }
194 }
195 
RunDelayedTasks(base::TimeTicks when)196 void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) {
197   DCHECK(BelongsToCurrentThread());
198 
199   scheduled_timers_.erase(when);
200 
201   // |stopped_| is updated by the plugin thread only, so it is safe to access
202   // it here without taking the lock.
203   if (!stopped_) {
204     ProcessIncomingTasks();
205     RunDueTasks(base::TimeTicks::Now());
206   }
207 }
208 
RunDueTasks(base::TimeTicks now)209 void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) {
210   DCHECK(BelongsToCurrentThread());
211 
212   // Run all due tasks.
213   while (!delayed_queue_.empty() &&
214          delayed_queue_.top().delayed_run_time <= now) {
215     delayed_queue_.top().task.Run();
216     delayed_queue_.pop();
217   }
218 
219   // Post a delayed asynchronous call to the plugin thread to process tasks from
220   // the delayed queue.
221   if (!delayed_queue_.empty()) {
222     base::TimeTicks when = delayed_queue_.top().delayed_run_time;
223     if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) {
224       PostDelayedRunTasks(when);
225     }
226   }
227 }
228 
RunTasks()229 void PluginThreadTaskRunner::RunTasks() {
230   DCHECK(BelongsToCurrentThread());
231 
232   // |stopped_| is updated by the plugin thread only, so it is safe to access
233   // it here without taking the lock.
234   if (!stopped_) {
235     ProcessIncomingTasks();
236     RunDueTasks(base::TimeTicks::Now());
237   }
238 }
239 
240 // static
TaskSpringboard(void * data)241 void PluginThreadTaskRunner::TaskSpringboard(void* data) {
242   base::Closure* task = reinterpret_cast<base::Closure*>(data);
243   task->Run();
244   delete task;
245 }
246 
247 }  // namespace remoting
248