1 // Copyright 2017 the V8 project authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_ 6 #define V8_HEAP_ITEM_PARALLEL_JOB_H_ 7 8 #include <memory> 9 #include <vector> 10 11 #include "src/base/atomic-utils.h" 12 #include "src/base/logging.h" 13 #include "src/base/macros.h" 14 #include "src/common/globals.h" 15 #include "src/tasks/cancelable-task.h" 16 17 namespace v8 { 18 19 namespace base { 20 class Semaphore; 21 } // namespace base 22 23 namespace internal { 24 25 class Counters; 26 class Isolate; 27 28 // This class manages background tasks that process a set of items in parallel. 29 // The first task added is executed on the same thread as |job.Run()| is called. 30 // All other tasks are scheduled in the background. 31 // 32 // - Items need to inherit from ItemParallelJob::Item. 33 // - Tasks need to inherit from ItemParallelJob::Task. 34 // 35 // Items need to be marked as finished after processing them. Task and Item 36 // ownership is transferred to the job. 37 class V8_EXPORT_PRIVATE ItemParallelJob { 38 public: 39 class Task; 40 41 class V8_EXPORT_PRIVATE Item { 42 public: 43 Item() = default; 44 virtual ~Item() = default; 45 46 // Marks an item as being finished. MarkFinished()47 void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); } 48 49 private: 50 enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished }; 51 TryMarkingAsProcessing()52 bool TryMarkingAsProcessing() { 53 ProcessingState available = kAvailable; 54 return state_.compare_exchange_strong(available, kProcessing); 55 } IsFinished()56 bool IsFinished() { return state_ == kFinished; } 57 58 std::atomic<ProcessingState> state_{kAvailable}; 59 60 friend class ItemParallelJob; 61 friend class ItemParallelJob::Task; 62 63 DISALLOW_COPY_AND_ASSIGN(Item); 64 }; 65 66 class V8_EXPORT_PRIVATE Task : public CancelableTask { 67 public: 68 enum class Runner { kForeground, kBackground }; 69 explicit Task(Isolate* isolate); 70 ~Task() override = default; 71 72 virtual void RunInParallel(Runner runner) = 0; 73 74 protected: 75 // Retrieves a new item that needs to be processed. Returns |nullptr| if 76 // all items are processed. Upon returning an item, the task is required 77 // to process the item and mark the item as finished after doing so. 78 template <class ItemType> GetItem()79 ItemType* GetItem() { 80 while (items_considered_++ != items_->size()) { 81 // Wrap around. 82 if (cur_index_ == items_->size()) { 83 cur_index_ = 0; 84 } 85 Item* item = (*items_)[cur_index_++]; 86 if (item->TryMarkingAsProcessing()) { 87 return static_cast<ItemType*>(item); 88 } 89 } 90 return nullptr; 91 } 92 93 private: 94 friend class ItemParallelJob; 95 friend class Item; 96 97 // Sets up state required before invoking Run(). If 98 // |start_index is >= items_.size()|, this task will not process work items 99 // (some jobs have more tasks than work items in order to parallelize post- 100 // processing, e.g. scavenging). 101 void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items, 102 size_t start_index); 103 void WillRunOnForeground(); 104 // We don't allow overriding this method any further. 105 void RunInternal() final; 106 107 std::vector<Item*>* items_ = nullptr; 108 size_t cur_index_ = 0; 109 size_t items_considered_ = 0; 110 Runner runner_ = Runner::kBackground; 111 base::Semaphore* on_finish_ = nullptr; 112 113 DISALLOW_COPY_AND_ASSIGN(Task); 114 }; 115 116 ItemParallelJob(CancelableTaskManager* cancelable_task_manager, 117 base::Semaphore* pending_tasks); 118 119 ~ItemParallelJob(); 120 121 // Adds a task to the job. Transfers ownership to the job. AddTask(Task * task)122 void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); } 123 124 // Adds an item to the job. Transfers ownership to the job. AddItem(Item * item)125 void AddItem(Item* item) { items_.push_back(item); } 126 NumberOfItems()127 int NumberOfItems() const { return static_cast<int>(items_.size()); } NumberOfTasks()128 int NumberOfTasks() const { return static_cast<int>(tasks_.size()); } 129 130 // Runs this job. 131 void Run(); 132 133 private: 134 std::vector<Item*> items_; 135 std::vector<std::unique_ptr<Task>> tasks_; 136 CancelableTaskManager* cancelable_task_manager_; 137 base::Semaphore* pending_tasks_; 138 139 DISALLOW_COPY_AND_ASSIGN(ItemParallelJob); 140 }; 141 142 } // namespace internal 143 } // namespace v8 144 145 #endif // V8_HEAP_ITEM_PARALLEL_JOB_H_ 146