1 // Copyright 2017 the V8 project authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_ 6 #define V8_HEAP_ITEM_PARALLEL_JOB_H_ 7 8 #include <memory> 9 #include <vector> 10 11 #include "src/base/atomic-utils.h" 12 #include "src/base/logging.h" 13 #include "src/base/macros.h" 14 #include "src/base/optional.h" 15 #include "src/cancelable-task.h" 16 #include "src/counters.h" 17 #include "src/globals.h" 18 19 namespace v8 { 20 21 namespace base { 22 class Semaphore; 23 } 24 25 namespace internal { 26 27 class Counters; 28 class Isolate; 29 30 // This class manages background tasks that process a set of items in parallel. 31 // The first task added is executed on the same thread as |job.Run()| is called. 32 // All other tasks are scheduled in the background. 33 // 34 // - Items need to inherit from ItemParallelJob::Item. 35 // - Tasks need to inherit from ItemParallelJob::Task. 36 // 37 // Items need to be marked as finished after processing them. Task and Item 38 // ownership is transferred to the job. 39 // 40 // Each parallel (non-main thread) task will report the time between the job 41 // being created and it being scheduled to |gc_parallel_task_latency_histogram|. 42 class V8_EXPORT_PRIVATE ItemParallelJob { 43 public: 44 class Task; 45 46 class V8_EXPORT_PRIVATE Item { 47 public: 48 Item() = default; 49 virtual ~Item() = default; 50 51 // Marks an item as being finished. MarkFinished()52 void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); } 53 54 private: 55 enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished }; 56 TryMarkingAsProcessing()57 bool TryMarkingAsProcessing() { 58 ProcessingState available = kAvailable; 59 return state_.compare_exchange_strong(available, kProcessing); 60 } IsFinished()61 bool IsFinished() { return state_ == kFinished; } 62 63 std::atomic<ProcessingState> state_{kAvailable}; 64 65 friend class ItemParallelJob; 66 friend class ItemParallelJob::Task; 67 68 DISALLOW_COPY_AND_ASSIGN(Item); 69 }; 70 71 class V8_EXPORT_PRIVATE Task : public CancelableTask { 72 public: 73 explicit Task(Isolate* isolate); 74 virtual ~Task(); 75 76 virtual void RunInParallel() = 0; 77 78 protected: 79 // Retrieves a new item that needs to be processed. Returns |nullptr| if 80 // all items are processed. Upon returning an item, the task is required 81 // to process the item and mark the item as finished after doing so. 82 template <class ItemType> GetItem()83 ItemType* GetItem() { 84 while (items_considered_++ != items_->size()) { 85 // Wrap around. 86 if (cur_index_ == items_->size()) { 87 cur_index_ = 0; 88 } 89 Item* item = (*items_)[cur_index_++]; 90 if (item->TryMarkingAsProcessing()) { 91 return static_cast<ItemType*>(item); 92 } 93 } 94 return nullptr; 95 } 96 97 private: 98 friend class ItemParallelJob; 99 friend class Item; 100 101 // Sets up state required before invoking Run(). If 102 // |start_index is >= items_.size()|, this task will not process work items 103 // (some jobs have more tasks than work items in order to parallelize post- 104 // processing, e.g. scavenging). If |gc_parallel_task_latency_histogram| is 105 // provided, it will be used to report histograms on the latency between 106 // posting the task and it being scheduled. 107 void SetupInternal( 108 base::Semaphore* on_finish, std::vector<Item*>* items, 109 size_t start_index, 110 base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram); 111 112 // We don't allow overriding this method any further. 113 void RunInternal() final; 114 115 std::vector<Item*>* items_ = nullptr; 116 size_t cur_index_ = 0; 117 size_t items_considered_ = 0; 118 base::Semaphore* on_finish_ = nullptr; 119 base::Optional<AsyncTimedHistogram> gc_parallel_task_latency_histogram_; 120 121 DISALLOW_COPY_AND_ASSIGN(Task); 122 }; 123 124 ItemParallelJob(CancelableTaskManager* cancelable_task_manager, 125 base::Semaphore* pending_tasks); 126 127 ~ItemParallelJob(); 128 129 // Adds a task to the job. Transfers ownership to the job. AddTask(Task * task)130 void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); } 131 132 // Adds an item to the job. Transfers ownership to the job. AddItem(Item * item)133 void AddItem(Item* item) { items_.push_back(item); } 134 NumberOfItems()135 int NumberOfItems() const { return static_cast<int>(items_.size()); } NumberOfTasks()136 int NumberOfTasks() const { return static_cast<int>(tasks_.size()); } 137 138 // Runs this job. Reporting metrics in a thread-safe manner to 139 // |async_counters|. 140 void Run(std::shared_ptr<Counters> async_counters); 141 142 private: 143 std::vector<Item*> items_; 144 std::vector<std::unique_ptr<Task>> tasks_; 145 CancelableTaskManager* cancelable_task_manager_; 146 base::Semaphore* pending_tasks_; 147 DISALLOW_COPY_AND_ASSIGN(ItemParallelJob); 148 }; 149 150 } // namespace internal 151 } // namespace v8 152 153 #endif // V8_HEAP_ITEM_PARALLEL_JOB_H_ 154