1 // Copyright 2018 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "src/heap/item-parallel-job.h"
6
7 #include "src/base/platform/semaphore.h"
8 #include "src/init/v8.h"
9 #include "src/logging/counters.h"
10
11 namespace v8 {
12 namespace internal {
13
Task(Isolate * isolate)14 ItemParallelJob::Task::Task(Isolate* isolate) : CancelableTask(isolate) {}
15
SetupInternal(base::Semaphore * on_finish,std::vector<Item * > * items,size_t start_index)16 void ItemParallelJob::Task::SetupInternal(base::Semaphore* on_finish,
17 std::vector<Item*>* items,
18 size_t start_index) {
19 on_finish_ = on_finish;
20 items_ = items;
21
22 if (start_index < items->size()) {
23 cur_index_ = start_index;
24 } else {
25 items_considered_ = items_->size();
26 }
27 }
28
WillRunOnForeground()29 void ItemParallelJob::Task::WillRunOnForeground() {
30 runner_ = Runner::kForeground;
31 }
32
RunInternal()33 void ItemParallelJob::Task::RunInternal() {
34 RunInParallel(runner_);
35 on_finish_->Signal();
36 }
37
ItemParallelJob(CancelableTaskManager * cancelable_task_manager,base::Semaphore * pending_tasks)38 ItemParallelJob::ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
39 base::Semaphore* pending_tasks)
40 : cancelable_task_manager_(cancelable_task_manager),
41 pending_tasks_(pending_tasks) {}
42
~ItemParallelJob()43 ItemParallelJob::~ItemParallelJob() {
44 for (size_t i = 0; i < items_.size(); i++) {
45 Item* item = items_[i];
46 CHECK(item->IsFinished());
47 delete item;
48 }
49 }
50
Run()51 void ItemParallelJob::Run() {
52 DCHECK_GT(tasks_.size(), 0);
53 const size_t num_items = items_.size();
54 const size_t num_tasks = tasks_.size();
55
56 TRACE_EVENT_INSTANT2(TRACE_DISABLED_BY_DEFAULT("v8.gc"),
57 "ItemParallelJob::Run", TRACE_EVENT_SCOPE_THREAD,
58 "num_tasks", static_cast<int>(num_tasks), "num_items",
59 static_cast<int>(num_items));
60
61 // Some jobs have more tasks than items (when the items are mere coarse
62 // grain tasks that generate work dynamically for a second phase which all
63 // tasks participate in). Some jobs even have 0 items to preprocess but
64 // still have multiple tasks.
65 // TODO(gab): Figure out a cleaner scheme for this.
66 const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
67
68 // In the event of an uneven workload, distribute an extra item to the first
69 // |items_remainder| tasks.
70 const size_t items_remainder = num_tasks_processing_items > 0
71 ? num_items % num_tasks_processing_items
72 : 0;
73 // Base |items_per_task|, will be bumped by 1 for the first
74 // |items_remainder| tasks.
75 const size_t items_per_task = num_tasks_processing_items > 0
76 ? num_items / num_tasks_processing_items
77 : 0;
78 CancelableTaskManager::Id* task_ids =
79 new CancelableTaskManager::Id[num_tasks];
80 std::unique_ptr<Task> main_task;
81 for (size_t i = 0, start_index = 0; i < num_tasks;
82 i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) {
83 auto task = std::move(tasks_[i]);
84 DCHECK(task);
85
86 // By definition there are less |items_remainder| to distribute then
87 // there are tasks processing items so this cannot overflow while we are
88 // assigning work items.
89 DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
90
91 task->SetupInternal(pending_tasks_, &items_, start_index);
92 task_ids[i] = task->id();
93 if (i > 0) {
94 V8::GetCurrentPlatform()->CallBlockingTaskOnWorkerThread(std::move(task));
95 } else {
96 main_task = std::move(task);
97 }
98 }
99
100 // Contribute on main thread.
101 DCHECK(main_task);
102 main_task->WillRunOnForeground();
103 main_task->Run();
104
105 // Wait for background tasks.
106 for (size_t i = 0; i < num_tasks; i++) {
107 if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
108 TryAbortResult::kTaskAborted) {
109 pending_tasks_->Wait();
110 }
111 }
112 delete[] task_ids;
113 }
114
115 } // namespace internal
116 } // namespace v8
117