1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cc/resources/task_graph_runner.h"
6
7 #include <algorithm>
8
9 #include "base/debug/trace_event.h"
10 #include "base/strings/stringprintf.h"
11 #include "base/threading/thread_restrictions.h"
12
13 namespace cc {
14 namespace {
15
16 // Helper class for iterating over all dependents of a task.
17 class DependentIterator {
18 public:
DependentIterator(TaskGraph * graph,const Task * task)19 DependentIterator(TaskGraph* graph, const Task* task)
20 : graph_(graph), task_(task), current_index_(-1), current_node_(NULL) {
21 ++(*this);
22 }
23
operator ->() const24 TaskGraph::Node& operator->() const {
25 DCHECK_LT(current_index_, graph_->edges.size());
26 DCHECK_EQ(graph_->edges[current_index_].task, task_);
27 DCHECK(current_node_);
28 return *current_node_;
29 }
30
operator *() const31 TaskGraph::Node& operator*() const {
32 DCHECK_LT(current_index_, graph_->edges.size());
33 DCHECK_EQ(graph_->edges[current_index_].task, task_);
34 DCHECK(current_node_);
35 return *current_node_;
36 }
37
38 // Note: Performance can be improved by keeping edges sorted.
operator ++()39 DependentIterator& operator++() {
40 // Find next dependency edge for |task_|.
41 do {
42 ++current_index_;
43 if (current_index_ == graph_->edges.size())
44 return *this;
45 } while (graph_->edges[current_index_].task != task_);
46
47 // Now find the node for the dependent of this edge.
48 TaskGraph::Node::Vector::iterator it =
49 std::find_if(graph_->nodes.begin(),
50 graph_->nodes.end(),
51 TaskGraph::Node::TaskComparator(
52 graph_->edges[current_index_].dependent));
53 DCHECK(it != graph_->nodes.end());
54 current_node_ = &(*it);
55
56 return *this;
57 }
58
operator bool() const59 operator bool() const { return current_index_ < graph_->edges.size(); }
60
61 private:
62 TaskGraph* graph_;
63 const Task* task_;
64 size_t current_index_;
65 TaskGraph::Node* current_node_;
66 };
67
68 class DependencyMismatchComparator {
69 public:
DependencyMismatchComparator(const TaskGraph * graph)70 explicit DependencyMismatchComparator(const TaskGraph* graph)
71 : graph_(graph) {}
72
operator ()(const TaskGraph::Node & node) const73 bool operator()(const TaskGraph::Node& node) const {
74 return static_cast<size_t>(std::count_if(graph_->edges.begin(),
75 graph_->edges.end(),
76 DependentComparator(node.task))) !=
77 node.dependencies;
78 }
79
80 private:
81 class DependentComparator {
82 public:
DependentComparator(const Task * dependent)83 explicit DependentComparator(const Task* dependent)
84 : dependent_(dependent) {}
85
operator ()(const TaskGraph::Edge & edge) const86 bool operator()(const TaskGraph::Edge& edge) const {
87 return edge.dependent == dependent_;
88 }
89
90 private:
91 const Task* dependent_;
92 };
93
94 const TaskGraph* graph_;
95 };
96
97 } // namespace
98
Task()99 Task::Task() : will_run_(false), did_run_(false) {
100 }
101
~Task()102 Task::~Task() {
103 DCHECK(!will_run_);
104 }
105
WillRun()106 void Task::WillRun() {
107 DCHECK(!will_run_);
108 DCHECK(!did_run_);
109 will_run_ = true;
110 }
111
DidRun()112 void Task::DidRun() {
113 DCHECK(will_run_);
114 will_run_ = false;
115 did_run_ = true;
116 }
117
HasFinishedRunning() const118 bool Task::HasFinishedRunning() const { return did_run_; }
119
TaskGraph()120 TaskGraph::TaskGraph() {}
121
~TaskGraph()122 TaskGraph::~TaskGraph() {}
123
Swap(TaskGraph * other)124 void TaskGraph::Swap(TaskGraph* other) {
125 nodes.swap(other->nodes);
126 edges.swap(other->edges);
127 }
128
Reset()129 void TaskGraph::Reset() {
130 nodes.clear();
131 edges.clear();
132 }
133
TaskNamespace()134 TaskGraphRunner::TaskNamespace::TaskNamespace() {}
135
~TaskNamespace()136 TaskGraphRunner::TaskNamespace::~TaskNamespace() {}
137
TaskGraphRunner()138 TaskGraphRunner::TaskGraphRunner()
139 : lock_(),
140 has_ready_to_run_tasks_cv_(&lock_),
141 has_namespaces_with_finished_running_tasks_cv_(&lock_),
142 next_namespace_id_(1),
143 shutdown_(false) {}
144
~TaskGraphRunner()145 TaskGraphRunner::~TaskGraphRunner() {
146 {
147 base::AutoLock lock(lock_);
148
149 DCHECK_EQ(0u, ready_to_run_namespaces_.size());
150 DCHECK_EQ(0u, namespaces_.size());
151 }
152 }
153
GetNamespaceToken()154 NamespaceToken TaskGraphRunner::GetNamespaceToken() {
155 base::AutoLock lock(lock_);
156
157 NamespaceToken token(next_namespace_id_++);
158 DCHECK(namespaces_.find(token.id_) == namespaces_.end());
159 return token;
160 }
161
ScheduleTasks(NamespaceToken token,TaskGraph * graph)162 void TaskGraphRunner::ScheduleTasks(NamespaceToken token, TaskGraph* graph) {
163 TRACE_EVENT2("cc",
164 "TaskGraphRunner::ScheduleTasks",
165 "num_nodes",
166 graph->nodes.size(),
167 "num_edges",
168 graph->edges.size());
169
170 DCHECK(token.IsValid());
171 DCHECK(std::find_if(graph->nodes.begin(),
172 graph->nodes.end(),
173 DependencyMismatchComparator(graph)) ==
174 graph->nodes.end());
175
176 {
177 base::AutoLock lock(lock_);
178
179 DCHECK(!shutdown_);
180
181 TaskNamespace& task_namespace = namespaces_[token.id_];
182
183 // First adjust number of dependencies to reflect completed tasks.
184 for (Task::Vector::iterator it = task_namespace.completed_tasks.begin();
185 it != task_namespace.completed_tasks.end();
186 ++it) {
187 for (DependentIterator node_it(graph, it->get()); node_it; ++node_it) {
188 TaskGraph::Node& node = *node_it;
189 DCHECK_LT(0u, node.dependencies);
190 node.dependencies--;
191 }
192 }
193
194 // Build new "ready to run" queue and remove nodes from old graph.
195 task_namespace.ready_to_run_tasks.clear();
196 for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin();
197 it != graph->nodes.end();
198 ++it) {
199 TaskGraph::Node& node = *it;
200
201 // Remove any old nodes that are associated with this task. The result is
202 // that the old graph is left with all nodes not present in this graph,
203 // which we use below to determine what tasks need to be canceled.
204 TaskGraph::Node::Vector::iterator old_it =
205 std::find_if(task_namespace.graph.nodes.begin(),
206 task_namespace.graph.nodes.end(),
207 TaskGraph::Node::TaskComparator(node.task));
208 if (old_it != task_namespace.graph.nodes.end()) {
209 std::swap(*old_it, task_namespace.graph.nodes.back());
210 task_namespace.graph.nodes.pop_back();
211 }
212
213 // Task is not ready to run if dependencies are not yet satisfied.
214 if (node.dependencies)
215 continue;
216
217 // Skip if already finished running task.
218 if (node.task->HasFinishedRunning())
219 continue;
220
221 // Skip if already running.
222 if (std::find(task_namespace.running_tasks.begin(),
223 task_namespace.running_tasks.end(),
224 node.task) != task_namespace.running_tasks.end())
225 continue;
226
227 task_namespace.ready_to_run_tasks.push_back(
228 PrioritizedTask(node.task, node.priority));
229 }
230
231 // Rearrange the elements in |ready_to_run_tasks| in such a way that they
232 // form a heap.
233 std::make_heap(task_namespace.ready_to_run_tasks.begin(),
234 task_namespace.ready_to_run_tasks.end(),
235 CompareTaskPriority);
236
237 // Swap task graph.
238 task_namespace.graph.Swap(graph);
239
240 // Determine what tasks in old graph need to be canceled.
241 for (TaskGraph::Node::Vector::iterator it = graph->nodes.begin();
242 it != graph->nodes.end();
243 ++it) {
244 TaskGraph::Node& node = *it;
245
246 // Skip if already finished running task.
247 if (node.task->HasFinishedRunning())
248 continue;
249
250 // Skip if already running.
251 if (std::find(task_namespace.running_tasks.begin(),
252 task_namespace.running_tasks.end(),
253 node.task) != task_namespace.running_tasks.end())
254 continue;
255
256 DCHECK(std::find(task_namespace.completed_tasks.begin(),
257 task_namespace.completed_tasks.end(),
258 node.task) == task_namespace.completed_tasks.end());
259 task_namespace.completed_tasks.push_back(node.task);
260 }
261
262 // Build new "ready to run" task namespaces queue.
263 ready_to_run_namespaces_.clear();
264 for (TaskNamespaceMap::iterator it = namespaces_.begin();
265 it != namespaces_.end();
266 ++it) {
267 if (!it->second.ready_to_run_tasks.empty())
268 ready_to_run_namespaces_.push_back(&it->second);
269 }
270
271 // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way
272 // that they form a heap.
273 std::make_heap(ready_to_run_namespaces_.begin(),
274 ready_to_run_namespaces_.end(),
275 CompareTaskNamespacePriority);
276
277 // If there is more work available, wake up worker thread.
278 if (!ready_to_run_namespaces_.empty())
279 has_ready_to_run_tasks_cv_.Signal();
280 }
281 }
282
WaitForTasksToFinishRunning(NamespaceToken token)283 void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) {
284 TRACE_EVENT0("cc", "TaskGraphRunner::WaitForTasksToFinishRunning");
285
286 DCHECK(token.IsValid());
287
288 {
289 base::AutoLock lock(lock_);
290
291 TaskNamespaceMap::const_iterator it = namespaces_.find(token.id_);
292 if (it == namespaces_.end())
293 return;
294
295 const TaskNamespace& task_namespace = it->second;
296
297 while (!HasFinishedRunningTasksInNamespace(&task_namespace))
298 has_namespaces_with_finished_running_tasks_cv_.Wait();
299
300 // There may be other namespaces that have finished running tasks, so wake
301 // up another origin thread.
302 has_namespaces_with_finished_running_tasks_cv_.Signal();
303 }
304 }
305
CollectCompletedTasks(NamespaceToken token,Task::Vector * completed_tasks)306 void TaskGraphRunner::CollectCompletedTasks(NamespaceToken token,
307 Task::Vector* completed_tasks) {
308 TRACE_EVENT0("cc", "TaskGraphRunner::CollectCompletedTasks");
309
310 DCHECK(token.IsValid());
311
312 {
313 base::AutoLock lock(lock_);
314
315 TaskNamespaceMap::iterator it = namespaces_.find(token.id_);
316 if (it == namespaces_.end())
317 return;
318
319 TaskNamespace& task_namespace = it->second;
320
321 DCHECK_EQ(0u, completed_tasks->size());
322 completed_tasks->swap(task_namespace.completed_tasks);
323 if (!HasFinishedRunningTasksInNamespace(&task_namespace))
324 return;
325
326 // Remove namespace if finished running tasks.
327 DCHECK_EQ(0u, task_namespace.completed_tasks.size());
328 DCHECK_EQ(0u, task_namespace.ready_to_run_tasks.size());
329 DCHECK_EQ(0u, task_namespace.running_tasks.size());
330 namespaces_.erase(it);
331 }
332 }
333
Shutdown()334 void TaskGraphRunner::Shutdown() {
335 base::AutoLock lock(lock_);
336
337 DCHECK_EQ(0u, ready_to_run_namespaces_.size());
338 DCHECK_EQ(0u, namespaces_.size());
339
340 DCHECK(!shutdown_);
341 shutdown_ = true;
342
343 // Wake up a worker so it knows it should exit. This will cause all workers
344 // to exit as each will wake up another worker before exiting.
345 has_ready_to_run_tasks_cv_.Signal();
346 }
347
Run()348 void TaskGraphRunner::Run() {
349 base::AutoLock lock(lock_);
350
351 while (true) {
352 if (ready_to_run_namespaces_.empty()) {
353 // Exit when shutdown is set and no more tasks are pending.
354 if (shutdown_)
355 break;
356
357 // Wait for more tasks.
358 has_ready_to_run_tasks_cv_.Wait();
359 continue;
360 }
361
362 RunTaskWithLockAcquired();
363 }
364
365 // We noticed we should exit. Wake up the next worker so it knows it should
366 // exit as well (because the Shutdown() code only signals once).
367 has_ready_to_run_tasks_cv_.Signal();
368 }
369
RunUntilIdle()370 void TaskGraphRunner::RunUntilIdle() {
371 base::AutoLock lock(lock_);
372
373 while (!ready_to_run_namespaces_.empty())
374 RunTaskWithLockAcquired();
375 }
376
RunTaskWithLockAcquired()377 void TaskGraphRunner::RunTaskWithLockAcquired() {
378 TRACE_EVENT0("toplevel", "TaskGraphRunner::RunTask");
379
380 lock_.AssertAcquired();
381 DCHECK(!ready_to_run_namespaces_.empty());
382
383 // Take top priority TaskNamespace from |ready_to_run_namespaces_|.
384 std::pop_heap(ready_to_run_namespaces_.begin(),
385 ready_to_run_namespaces_.end(),
386 CompareTaskNamespacePriority);
387 TaskNamespace* task_namespace = ready_to_run_namespaces_.back();
388 ready_to_run_namespaces_.pop_back();
389 DCHECK(!task_namespace->ready_to_run_tasks.empty());
390
391 // Take top priority task from |ready_to_run_tasks|.
392 std::pop_heap(task_namespace->ready_to_run_tasks.begin(),
393 task_namespace->ready_to_run_tasks.end(),
394 CompareTaskPriority);
395 scoped_refptr<Task> task(task_namespace->ready_to_run_tasks.back().task);
396 task_namespace->ready_to_run_tasks.pop_back();
397
398 // Add task namespace back to |ready_to_run_namespaces_| if not empty after
399 // taking top priority task.
400 if (!task_namespace->ready_to_run_tasks.empty()) {
401 ready_to_run_namespaces_.push_back(task_namespace);
402 std::push_heap(ready_to_run_namespaces_.begin(),
403 ready_to_run_namespaces_.end(),
404 CompareTaskNamespacePriority);
405 }
406
407 // Add task to |running_tasks|.
408 task_namespace->running_tasks.push_back(task.get());
409
410 // There may be more work available, so wake up another worker thread.
411 has_ready_to_run_tasks_cv_.Signal();
412
413 // Call WillRun() before releasing |lock_| and running task.
414 task->WillRun();
415
416 {
417 base::AutoUnlock unlock(lock_);
418
419 task->RunOnWorkerThread();
420 }
421
422 // This will mark task as finished running.
423 task->DidRun();
424
425 // Remove task from |running_tasks|.
426 TaskVector::iterator it = std::find(task_namespace->running_tasks.begin(),
427 task_namespace->running_tasks.end(),
428 task.get());
429 DCHECK(it != task_namespace->running_tasks.end());
430 std::swap(*it, task_namespace->running_tasks.back());
431 task_namespace->running_tasks.pop_back();
432
433 // Now iterate over all dependents to decrement dependencies and check if they
434 // are ready to run.
435 bool ready_to_run_namespaces_has_heap_properties = true;
436 for (DependentIterator it(&task_namespace->graph, task.get()); it; ++it) {
437 TaskGraph::Node& dependent_node = *it;
438
439 DCHECK_LT(0u, dependent_node.dependencies);
440 dependent_node.dependencies--;
441 // Task is ready if it has no dependencies. Add it to |ready_to_run_tasks_|.
442 if (!dependent_node.dependencies) {
443 bool was_empty = task_namespace->ready_to_run_tasks.empty();
444 task_namespace->ready_to_run_tasks.push_back(
445 PrioritizedTask(dependent_node.task, dependent_node.priority));
446 std::push_heap(task_namespace->ready_to_run_tasks.begin(),
447 task_namespace->ready_to_run_tasks.end(),
448 CompareTaskPriority);
449 // Task namespace is ready if it has at least one ready to run task. Add
450 // it to |ready_to_run_namespaces_| if it just become ready.
451 if (was_empty) {
452 DCHECK(std::find(ready_to_run_namespaces_.begin(),
453 ready_to_run_namespaces_.end(),
454 task_namespace) == ready_to_run_namespaces_.end());
455 ready_to_run_namespaces_.push_back(task_namespace);
456 }
457 ready_to_run_namespaces_has_heap_properties = false;
458 }
459 }
460
461 // Rearrange the task namespaces in |ready_to_run_namespaces_| in such a way
462 // that they yet again form a heap.
463 if (!ready_to_run_namespaces_has_heap_properties) {
464 std::make_heap(ready_to_run_namespaces_.begin(),
465 ready_to_run_namespaces_.end(),
466 CompareTaskNamespacePriority);
467 }
468
469 // Finally add task to |completed_tasks_|.
470 task_namespace->completed_tasks.push_back(task);
471
472 // If namespace has finished running all tasks, wake up origin thread.
473 if (HasFinishedRunningTasksInNamespace(task_namespace))
474 has_namespaces_with_finished_running_tasks_cv_.Signal();
475 }
476
477 } // namespace cc
478