• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef V8_HEAP_ITEM_PARALLEL_JOB_H_
6 #define V8_HEAP_ITEM_PARALLEL_JOB_H_
7 
8 #include <memory>
9 #include <vector>
10 
11 #include "src/base/atomic-utils.h"
12 #include "src/base/logging.h"
13 #include "src/base/macros.h"
14 #include "src/common/globals.h"
15 #include "src/tasks/cancelable-task.h"
16 
17 namespace v8 {
18 
19 namespace base {
20 class Semaphore;
21 }  // namespace base
22 
23 namespace internal {
24 
25 class Counters;
26 class Isolate;
27 
28 // This class manages background tasks that process a set of items in parallel.
29 // The first task added is executed on the same thread as |job.Run()| is called.
30 // All other tasks are scheduled in the background.
31 //
32 // - Items need to inherit from ItemParallelJob::Item.
33 // - Tasks need to inherit from ItemParallelJob::Task.
34 //
35 // Items need to be marked as finished after processing them. Task and Item
36 // ownership is transferred to the job.
37 class V8_EXPORT_PRIVATE ItemParallelJob {
38  public:
39   class Task;
40 
41   class V8_EXPORT_PRIVATE Item {
42    public:
43     Item() = default;
44     virtual ~Item() = default;
45 
46     // Marks an item as being finished.
MarkFinished()47     void MarkFinished() { CHECK_EQ(kProcessing, state_.exchange(kFinished)); }
48 
49    private:
50     enum ProcessingState : uintptr_t { kAvailable, kProcessing, kFinished };
51 
TryMarkingAsProcessing()52     bool TryMarkingAsProcessing() {
53       ProcessingState available = kAvailable;
54       return state_.compare_exchange_strong(available, kProcessing);
55     }
IsFinished()56     bool IsFinished() { return state_ == kFinished; }
57 
58     std::atomic<ProcessingState> state_{kAvailable};
59 
60     friend class ItemParallelJob;
61     friend class ItemParallelJob::Task;
62 
63     DISALLOW_COPY_AND_ASSIGN(Item);
64   };
65 
66   class V8_EXPORT_PRIVATE Task : public CancelableTask {
67    public:
68     enum class Runner { kForeground, kBackground };
69     explicit Task(Isolate* isolate);
70     ~Task() override = default;
71 
72     virtual void RunInParallel(Runner runner) = 0;
73 
74    protected:
75     // Retrieves a new item that needs to be processed. Returns |nullptr| if
76     // all items are processed. Upon returning an item, the task is required
77     // to process the item and mark the item as finished after doing so.
78     template <class ItemType>
GetItem()79     ItemType* GetItem() {
80       while (items_considered_++ != items_->size()) {
81         // Wrap around.
82         if (cur_index_ == items_->size()) {
83           cur_index_ = 0;
84         }
85         Item* item = (*items_)[cur_index_++];
86         if (item->TryMarkingAsProcessing()) {
87           return static_cast<ItemType*>(item);
88         }
89       }
90       return nullptr;
91     }
92 
93    private:
94     friend class ItemParallelJob;
95     friend class Item;
96 
97     // Sets up state required before invoking Run(). If
98     // |start_index is >= items_.size()|, this task will not process work items
99     // (some jobs have more tasks than work items in order to parallelize post-
100     // processing, e.g. scavenging).
101     void SetupInternal(base::Semaphore* on_finish, std::vector<Item*>* items,
102                        size_t start_index);
103     void WillRunOnForeground();
104     // We don't allow overriding this method any further.
105     void RunInternal() final;
106 
107     std::vector<Item*>* items_ = nullptr;
108     size_t cur_index_ = 0;
109     size_t items_considered_ = 0;
110     Runner runner_ = Runner::kBackground;
111     base::Semaphore* on_finish_ = nullptr;
112 
113     DISALLOW_COPY_AND_ASSIGN(Task);
114   };
115 
116   ItemParallelJob(CancelableTaskManager* cancelable_task_manager,
117                   base::Semaphore* pending_tasks);
118 
119   ~ItemParallelJob();
120 
121   // Adds a task to the job. Transfers ownership to the job.
AddTask(Task * task)122   void AddTask(Task* task) { tasks_.push_back(std::unique_ptr<Task>(task)); }
123 
124   // Adds an item to the job. Transfers ownership to the job.
AddItem(Item * item)125   void AddItem(Item* item) { items_.push_back(item); }
126 
NumberOfItems()127   int NumberOfItems() const { return static_cast<int>(items_.size()); }
NumberOfTasks()128   int NumberOfTasks() const { return static_cast<int>(tasks_.size()); }
129 
130   // Runs this job.
131   void Run();
132 
133  private:
134   std::vector<Item*> items_;
135   std::vector<std::unique_ptr<Task>> tasks_;
136   CancelableTaskManager* cancelable_task_manager_;
137   base::Semaphore* pending_tasks_;
138 
139   DISALLOW_COPY_AND_ASSIGN(ItemParallelJob);
140 };
141 
142 }  // namespace internal
143 }  // namespace v8
144 
145 #endif  // V8_HEAP_ITEM_PARALLEL_JOB_H_
146