• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 the V8 project authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef V8_LIBPLATFORM_DEFAULT_JOB_H_
6 #define V8_LIBPLATFORM_DEFAULT_JOB_H_
7 
8 #include <atomic>
9 #include <memory>
10 
11 #include "include/libplatform/libplatform-export.h"
12 #include "include/v8-platform.h"
13 #include "src/base/platform/condition-variable.h"
14 #include "src/base/platform/mutex.h"
15 
16 namespace v8 {
17 namespace platform {
18 
19 class V8_PLATFORM_EXPORT DefaultJobState
20     : public std::enable_shared_from_this<DefaultJobState> {
21  public:
22   class JobDelegate : public v8::JobDelegate {
23    public:
24     explicit JobDelegate(DefaultJobState* outer, bool is_joining_thread = false)
outer_(outer)25         : outer_(outer), is_joining_thread_(is_joining_thread) {}
26     ~JobDelegate();
27 
NotifyConcurrencyIncrease()28     void NotifyConcurrencyIncrease() override {
29       outer_->NotifyConcurrencyIncrease();
30     }
ShouldYield()31     bool ShouldYield() override {
32       // Thread-safe but may return an outdated result.
33       return outer_->is_canceled_.load(std::memory_order_relaxed);
34     }
35     uint8_t GetTaskId() override;
IsJoiningThread()36     bool IsJoiningThread() const override { return is_joining_thread_; }
37 
38    private:
39     static constexpr uint8_t kInvalidTaskId =
40         std::numeric_limits<uint8_t>::max();
41 
42     DefaultJobState* outer_;
43     uint8_t task_id_ = kInvalidTaskId;
44     bool is_joining_thread_;
45   };
46 
47   DefaultJobState(Platform* platform, std::unique_ptr<JobTask> job_task,
48                   TaskPriority priority, size_t num_worker_threads);
49   virtual ~DefaultJobState();
50 
51   void NotifyConcurrencyIncrease();
52   uint8_t AcquireTaskId();
53   void ReleaseTaskId(uint8_t task_id);
54 
55   void Join();
56   void CancelAndWait();
57   void CancelAndDetach();
58   bool IsActive();
59 
60   // Must be called before running |job_task_| for the first time. If it returns
61   // true, then the worker thread must contribute and must call DidRunTask(), or
62   // false if it should return.
63   bool CanRunFirstTask();
64   // Must be called after running |job_task_|. Returns true if the worker thread
65   // must contribute again, or false if it should return.
66   bool DidRunTask();
67 
68   void UpdatePriority(TaskPriority);
69 
70  private:
71   // Called from the joining thread. Waits for the worker count to be below or
72   // equal to max concurrency (will happen when a worker calls
73   // DidRunTask()). Returns true if the joining thread should run a task, or
74   // false if joining was completed and all other workers returned because
75   // there's no work remaining.
76   bool WaitForParticipationOpportunityLockRequired();
77 
78   // Returns GetMaxConcurrency() capped by the number of threads used by this
79   // job.
80   size_t CappedMaxConcurrency(size_t worker_count) const;
81 
82   void CallOnWorkerThread(TaskPriority priority, std::unique_ptr<Task> task);
83 
84   Platform* const platform_;
85   std::unique_ptr<JobTask> job_task_;
86 
87   // All members below are protected by |mutex_|.
88   base::Mutex mutex_;
89   TaskPriority priority_;
90   // Number of workers running this job.
91   size_t active_workers_ = 0;
92   // Number of posted tasks that aren't running this job yet.
93   size_t pending_tasks_ = 0;
94   // Indicates if the job is canceled.
95   std::atomic_bool is_canceled_{false};
96   // Number of worker threads available to schedule the worker task.
97   size_t num_worker_threads_;
98   // Signaled when a worker returns.
99   base::ConditionVariable worker_released_condition_;
100 
101   std::atomic<uint32_t> assigned_task_ids_{0};
102 };
103 
104 class V8_PLATFORM_EXPORT DefaultJobHandle : public JobHandle {
105  public:
106   explicit DefaultJobHandle(std::shared_ptr<DefaultJobState> state);
107   ~DefaultJobHandle() override;
108 
NotifyConcurrencyIncrease()109   void NotifyConcurrencyIncrease() override {
110     state_->NotifyConcurrencyIncrease();
111   }
112 
113   void Join() override;
114   void Cancel() override;
115   void CancelAndDetach() override;
IsCompleted()116   bool IsCompleted() override { return !IsActive(); }
117   bool IsActive() override;
IsRunning()118   bool IsRunning() override { return IsValid(); }
IsValid()119   bool IsValid() override { return state_ != nullptr; }
120 
UpdatePriorityEnabled()121   bool UpdatePriorityEnabled() const override { return true; }
122 
123   void UpdatePriority(TaskPriority) override;
124 
125  private:
126   std::shared_ptr<DefaultJobState> state_;
127 
128   DISALLOW_COPY_AND_ASSIGN(DefaultJobHandle);
129 };
130 
131 class DefaultJobWorker : public Task {
132  public:
DefaultJobWorker(std::weak_ptr<DefaultJobState> state,JobTask * job_task)133   DefaultJobWorker(std::weak_ptr<DefaultJobState> state, JobTask* job_task)
134       : state_(std::move(state)), job_task_(job_task) {}
135   ~DefaultJobWorker() override = default;
136 
Run()137   void Run() override {
138     auto shared_state = state_.lock();
139     if (!shared_state) return;
140     if (!shared_state->CanRunFirstTask()) return;
141     do {
142       // Scope of |delegate| must not outlive DidRunTask() so that associated
143       // state is freed before the worker becomes inactive.
144       DefaultJobState::JobDelegate delegate(shared_state.get());
145       job_task_->Run(&delegate);
146     } while (shared_state->DidRunTask());
147   }
148 
149  private:
150   friend class DefaultJob;
151 
152   std::weak_ptr<DefaultJobState> state_;
153   JobTask* job_task_;
154 
155   DISALLOW_COPY_AND_ASSIGN(DefaultJobWorker);
156 };
157 
158 }  // namespace platform
159 }  // namespace v8
160 
161 #endif  // V8_LIBPLATFORM_DEFAULT_JOB_H_
162