• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/heap/item-parallel-job.h"
6 
7 #include "src/base/platform/semaphore.h"
8 #include "src/v8.h"
9 
10 namespace v8 {
11 namespace internal {
12 
Task(Isolate * isolate)13 ItemParallelJob::Task::Task(Isolate* isolate) : CancelableTask(isolate) {}
14 
~Task()15 ItemParallelJob::Task::~Task() {
16   // The histogram is reset in RunInternal(). If it's still around it means
17   // this task was cancelled before being scheduled.
18   if (gc_parallel_task_latency_histogram_)
19     gc_parallel_task_latency_histogram_->RecordAbandon();
20 }
21 
SetupInternal(base::Semaphore * on_finish,std::vector<Item * > * items,size_t start_index,base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram)22 void ItemParallelJob::Task::SetupInternal(
23     base::Semaphore* on_finish, std::vector<Item*>* items, size_t start_index,
24     base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram) {
25   on_finish_ = on_finish;
26   items_ = items;
27 
28   if (start_index < items->size()) {
29     cur_index_ = start_index;
30   } else {
31     items_considered_ = items_->size();
32   }
33 
34   gc_parallel_task_latency_histogram_ =
35       std::move(gc_parallel_task_latency_histogram);
36 }
37 
RunInternal()38 void ItemParallelJob::Task::RunInternal() {
39   if (gc_parallel_task_latency_histogram_) {
40     gc_parallel_task_latency_histogram_->RecordDone();
41     gc_parallel_task_latency_histogram_.reset();
42   }
43 
44   RunInParallel();
45   on_finish_->Signal();
46 }
47 
ItemParallelJob(CancelableTaskManager * cancelable_task_manager,base::Semaphore * pending_tasks)48 ItemParallelJob::ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
49                                  base::Semaphore* pending_tasks)
50     : cancelable_task_manager_(cancelable_task_manager),
51       pending_tasks_(pending_tasks) {}
52 
~ItemParallelJob()53 ItemParallelJob::~ItemParallelJob() {
54   for (size_t i = 0; i < items_.size(); i++) {
55     Item* item = items_[i];
56     CHECK(item->IsFinished());
57     delete item;
58   }
59 }
60 
Run(std::shared_ptr<Counters> async_counters)61 void ItemParallelJob::Run(std::shared_ptr<Counters> async_counters) {
62   DCHECK_GT(tasks_.size(), 0);
63   const size_t num_items = items_.size();
64   const size_t num_tasks = tasks_.size();
65 
66   TRACE_EVENT_INSTANT2(TRACE_DISABLED_BY_DEFAULT("v8.gc"),
67                        "ItemParallelJob::Run", TRACE_EVENT_SCOPE_THREAD,
68                        "num_tasks", static_cast<int>(num_tasks), "num_items",
69                        static_cast<int>(num_items));
70 
71   AsyncTimedHistogram gc_parallel_task_latency_histogram(
72       async_counters->gc_parallel_task_latency(), async_counters);
73 
74   // Some jobs have more tasks than items (when the items are mere coarse
75   // grain tasks that generate work dynamically for a second phase which all
76   // tasks participate in). Some jobs even have 0 items to preprocess but
77   // still have multiple tasks.
78   // TODO(gab): Figure out a cleaner scheme for this.
79   const size_t num_tasks_processing_items = Min(num_items, tasks_.size());
80 
81   // In the event of an uneven workload, distribute an extra item to the first
82   // |items_remainder| tasks.
83   const size_t items_remainder = num_tasks_processing_items > 0
84                                      ? num_items % num_tasks_processing_items
85                                      : 0;
86   // Base |items_per_task|, will be bumped by 1 for the first
87   // |items_remainder| tasks.
88   const size_t items_per_task = num_tasks_processing_items > 0
89                                     ? num_items / num_tasks_processing_items
90                                     : 0;
91   CancelableTaskManager::Id* task_ids =
92       new CancelableTaskManager::Id[num_tasks];
93   std::unique_ptr<Task> main_task;
94   for (size_t i = 0, start_index = 0; i < num_tasks;
95        i++, start_index += items_per_task + (i < items_remainder ? 1 : 0)) {
96     auto task = std::move(tasks_[i]);
97     DCHECK(task);
98 
99     // By definition there are less |items_remainder| to distribute then
100     // there are tasks processing items so this cannot overflow while we are
101     // assigning work items.
102     DCHECK_IMPLIES(start_index >= num_items, i >= num_tasks_processing_items);
103 
104     task->SetupInternal(pending_tasks_, &items_, start_index,
105                         i > 0 ? gc_parallel_task_latency_histogram
106                               : base::Optional<AsyncTimedHistogram>());
107     task_ids[i] = task->id();
108     if (i > 0) {
109       V8::GetCurrentPlatform()->CallBlockingTaskOnWorkerThread(std::move(task));
110     } else {
111       main_task = std::move(task);
112     }
113   }
114 
115   // Contribute on main thread.
116   DCHECK(main_task);
117   main_task->Run();
118 
119   // Wait for background tasks.
120   for (size_t i = 0; i < num_tasks; i++) {
121     if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
122         CancelableTaskManager::kTaskAborted) {
123       pending_tasks_->Wait();
124     }
125   }
126   delete[] task_ids;
127 }
128 
129 }  // namespace internal
130 }  // namespace v8
131