• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef V8_HEAP_PAGE_PARALLEL_JOB_
6 #define V8_HEAP_PAGE_PARALLEL_JOB_
7 
8 #include "src/allocation.h"
9 #include "src/cancelable-task.h"
10 #include "src/utils.h"
11 #include "src/v8.h"
12 
13 namespace v8 {
14 namespace internal {
15 
16 class Heap;
17 class Isolate;
18 
19 // This class manages background tasks that process set of pages in parallel.
20 // The JobTraits class needs to define:
21 // - PerPageData type - state associated with each page.
22 // - PerTaskData type - state associated with each task.
23 // - static bool ProcessPageInParallel(Heap* heap,
24 //                                     PerTaskData task_data,
25 //                                     MemoryChunk* page,
26 //                                     PerPageData page_data)
27 //   The function should return true iff processing succeeded.
28 // - static const bool NeedSequentialFinalization
29 // - static void FinalizePageSequentially(Heap* heap,
30 //                                        bool processing_succeeded,
31 //                                        MemoryChunk* page,
32 //                                        PerPageData page_data)
33 template <typename JobTraits>
34 class PageParallelJob {
35  public:
36   // PageParallelJob cannot dynamically create a semaphore because of a bug in
37   // glibc. See http://crbug.com/609249 and
38   // https://sourceware.org/bugzilla/show_bug.cgi?id=12674.
39   // The caller must provide a semaphore with value 0 and ensure that
40   // the lifetime of the semaphore is the same as the lifetime of the Isolate.
41   // It is guaranteed that the semaphore value will be 0 after Run() call.
PageParallelJob(Heap * heap,CancelableTaskManager * cancelable_task_manager,base::Semaphore * semaphore)42   PageParallelJob(Heap* heap, CancelableTaskManager* cancelable_task_manager,
43                   base::Semaphore* semaphore)
44       : heap_(heap),
45         cancelable_task_manager_(cancelable_task_manager),
46         items_(nullptr),
47         num_items_(0),
48         num_tasks_(0),
49         pending_tasks_(semaphore) {}
50 
~PageParallelJob()51   ~PageParallelJob() {
52     Item* item = items_;
53     while (item != nullptr) {
54       Item* next = item->next;
55       delete item;
56       item = next;
57     }
58   }
59 
AddPage(MemoryChunk * chunk,typename JobTraits::PerPageData data)60   void AddPage(MemoryChunk* chunk, typename JobTraits::PerPageData data) {
61     Item* item = new Item(chunk, data, items_);
62     items_ = item;
63     ++num_items_;
64   }
65 
NumberOfPages()66   int NumberOfPages() { return num_items_; }
67 
68   // Returns the number of tasks that were spawned when running the job.
NumberOfTasks()69   int NumberOfTasks() { return num_tasks_; }
70 
71   // Runs the given number of tasks in parallel and processes the previously
72   // added pages. This function blocks until all tasks finish.
73   // The callback takes the index of a task and returns data for that task.
74   template <typename Callback>
Run(int num_tasks,Callback per_task_data_callback)75   void Run(int num_tasks, Callback per_task_data_callback) {
76     if (num_items_ == 0) return;
77     DCHECK_GE(num_tasks, 1);
78     uint32_t task_ids[kMaxNumberOfTasks];
79     const int max_num_tasks = Min(
80         kMaxNumberOfTasks,
81         static_cast<int>(
82             V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads()));
83     num_tasks_ = Max(1, Min(num_tasks, max_num_tasks));
84     int items_per_task = (num_items_ + num_tasks_ - 1) / num_tasks_;
85     int start_index = 0;
86     Task* main_task = nullptr;
87     for (int i = 0; i < num_tasks_; i++, start_index += items_per_task) {
88       if (start_index >= num_items_) {
89         start_index -= num_items_;
90       }
91       Task* task = new Task(heap_, items_, num_items_, start_index,
92                             pending_tasks_, per_task_data_callback(i));
93       task_ids[i] = task->id();
94       if (i > 0) {
95         V8::GetCurrentPlatform()->CallOnBackgroundThread(
96             task, v8::Platform::kShortRunningTask);
97       } else {
98         main_task = task;
99       }
100     }
101     // Contribute on main thread.
102     main_task->Run();
103     delete main_task;
104     // Wait for background tasks.
105     for (int i = 0; i < num_tasks_; i++) {
106       if (!cancelable_task_manager_->TryAbort(task_ids[i])) {
107         pending_tasks_->Wait();
108       }
109     }
110     if (JobTraits::NeedSequentialFinalization) {
111       Item* item = items_;
112       while (item != nullptr) {
113         bool success = (item->state.Value() == kFinished);
114         JobTraits::FinalizePageSequentially(heap_, item->chunk, success,
115                                             item->data);
116         item = item->next;
117       }
118     }
119   }
120 
121  private:
122   static const int kMaxNumberOfTasks = 10;
123 
124   enum ProcessingState { kAvailable, kProcessing, kFinished, kFailed };
125 
126   struct Item : public Malloced {
ItemItem127     Item(MemoryChunk* chunk, typename JobTraits::PerPageData data, Item* next)
128         : chunk(chunk), state(kAvailable), data(data), next(next) {}
129     MemoryChunk* chunk;
130     base::AtomicValue<ProcessingState> state;
131     typename JobTraits::PerPageData data;
132     Item* next;
133   };
134 
135   class Task : public CancelableTask {
136    public:
Task(Heap * heap,Item * items,int num_items,int start_index,base::Semaphore * on_finish,typename JobTraits::PerTaskData data)137     Task(Heap* heap, Item* items, int num_items, int start_index,
138          base::Semaphore* on_finish, typename JobTraits::PerTaskData data)
139         : CancelableTask(heap->isolate()),
140           heap_(heap),
141           items_(items),
142           num_items_(num_items),
143           start_index_(start_index),
144           on_finish_(on_finish),
145           data_(data) {}
146 
~Task()147     virtual ~Task() {}
148 
149    private:
150     // v8::internal::CancelableTask overrides.
RunInternal()151     void RunInternal() override {
152       // Each task starts at a different index to improve parallelization.
153       Item* current = items_;
154       int skip = start_index_;
155       while (skip-- > 0) {
156         current = current->next;
157       }
158       for (int i = 0; i < num_items_; i++) {
159         if (current->state.TrySetValue(kAvailable, kProcessing)) {
160           bool success = JobTraits::ProcessPageInParallel(
161               heap_, data_, current->chunk, current->data);
162           current->state.SetValue(success ? kFinished : kFailed);
163         }
164         current = current->next;
165         // Wrap around if needed.
166         if (current == nullptr) {
167           current = items_;
168         }
169       }
170       on_finish_->Signal();
171     }
172 
173     Heap* heap_;
174     Item* items_;
175     int num_items_;
176     int start_index_;
177     base::Semaphore* on_finish_;
178     typename JobTraits::PerTaskData data_;
179     DISALLOW_COPY_AND_ASSIGN(Task);
180   };
181 
182   Heap* heap_;
183   CancelableTaskManager* cancelable_task_manager_;
184   Item* items_;
185   int num_items_;
186   int num_tasks_;
187   base::Semaphore* pending_tasks_;
188   DISALLOW_COPY_AND_ASSIGN(PageParallelJob);
189 };
190 
191 }  // namespace internal
192 }  // namespace v8
193 
194 #endif  // V8_HEAP_PAGE_PARALLEL_JOB_
195