• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/heap/item-parallel-job.h"
6 
7 #include "src/base/platform/semaphore.h"
8 #include "src/init/v8.h"
9 #include "src/logging/counters.h"
10 
11 namespace v8 {
12 namespace internal {
13 
Task(Isolate * isolate)14 ItemParallelJob::Task::Task(Isolate* isolate) : CancelableTask(isolate) {}
15 
SetupInternal(base::Semaphore * on_finish,std::vector<Item * > * items,size_t start_index)16 void ItemParallelJob::Task::SetupInternal(base::Semaphore* on_finish,
17                                           std::vector<Item*>* items,
18                                           size_t start_index) {
19   on_finish_ = on_finish;
20   items_ = items;
21 
22   if (start_index < items->size()) {
23     cur_index_ = start_index;
24   } else {
25     items_considered_ = items_->size();
26   }
27 }
28 
WillRunOnForeground()29 void ItemParallelJob::Task::WillRunOnForeground() {
30   runner_ = Runner::kForeground;
31 }
32 
RunInternal()33 void ItemParallelJob::Task::RunInternal() {
34   RunInParallel(runner_);
35   on_finish_->Signal();
36 }
37 
ItemParallelJob(CancelableTaskManager * cancelable_task_manager,base::Semaphore * pending_tasks)38 ItemParallelJob::ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
39                                  base::Semaphore* pending_tasks)
40     : cancelable_task_manager_(cancelable_task_manager),
41       pending_tasks_(pending_tasks) {}
42 
~ItemParallelJob()43 ItemParallelJob::~ItemParallelJob() {
44   for (size_t i = 0; i < items_.size(); i++) {
45     Item* item = items_[i];
46     CHECK(item->IsFinished());
47     delete item;
48   }
49 }
50 
Run()51 void ItemParallelJob::Run() {
52   DCHECK_GT(tasks_.size(), 0);
53   const size_t num_items = items_.size();
54   const size_t num_tasks = tasks_.size();
55 
56   TRACE_EVENT_INSTANT2(TRACE_DISABLED_BY_DEFAULT("v8.gc"),
57                        "ItemParallelJob::Run", TRACE_EVENT_SCOPE_THREAD,
58                        "num_tasks", static_cast<int>(num_tasks), "num_items",
59                        static_cast<int>(num_items));
60 
61   // Some jobs have more tasks than items (when the items are mere coarse
62   // grain tasks that generate work dynamically for a second phase which all
63   // tasks participate in). Some jobs even have 0 items to preprocess but
64   // still have multiple tasks.
65   // TODO(gab): Figure out a cleaner scheme for this.
66   const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
67 
68   // In the event of an uneven workload, distribute an extra item to the first
69   // |items_remainder| tasks.
70   const size_t items_remainder = num_tasks_processing_items > 0
71                                      ? num_items % num_tasks_processing_items
72                                      : 0;
73   // Base |items_per_task|, will be bumped by 1 for the first
74   // |items_remainder| tasks.
75   const size_t items_per_task = num_tasks_processing_items > 0
76                                     ? num_items / num_tasks_processing_items
77                                     : 0;
78   CancelableTaskManager::Id* task_ids =
79       new CancelableTaskManager::Id[num_tasks];
80   std::unique_ptr<Task> main_task;
81   for (size_t i = 0, start_index = 0; i < num_tasks;
82        i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) {
83     auto task = std::move(tasks_[i]);
84     DCHECK(task);
85 
86     // By definition there are less |items_remainder| to distribute then
87     // there are tasks processing items so this cannot overflow while we are
88     // assigning work items.
89     DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
90 
91     task->SetupInternal(pending_tasks_, &items_, start_index);
92     task_ids[i] = task->id();
93     if (i > 0) {
94       V8::GetCurrentPlatform()->CallBlockingTaskOnWorkerThread(std::move(task));
95     } else {
96       main_task = std::move(task);
97     }
98   }
99 
100   // Contribute on main thread.
101   DCHECK(main_task);
102   main_task->WillRunOnForeground();
103   main_task->Run();
104 
105   // Wait for background tasks.
106   for (size_t i = 0; i < num_tasks; i++) {
107     if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
108         TryAbortResult::kTaskAborted) {
109       pending_tasks_->Wait();
110     }
111   }
112   delete[] task_ids;
113 }
114 
115 }  // namespace internal
116 }  // namespace v8
117