• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_WORKER_THREAD_LOCAL_QUEUE_H
17 #define LIBPANDABASE_TASKMANAGER_UTILS_WORKER_THREAD_LOCAL_QUEUE_H
18 
19 #include "libpandabase/taskmanager/task.h"
20 #include "libpandabase/taskmanager/utils/sp_mc_lock_free_queue.h"
21 #include "libpandabase/os/mutex.h"
22 #include <optional>
23 #include <unordered_map>
24 
25 namespace ark::taskmanager::internal {
26 
27 template <size_t WORKER_QUEUE_SIZE>
28 class WorkerThreadLocalQueue {
29 public:
WorkerThreadLocalQueue()30     WorkerThreadLocalQueue()
31     {
32         for (TaskType taskType : ALL_TASK_TYPES) {
33             for (VMType vmType : ALL_VM_TYPES) {
34                 for (TaskExecutionMode executionMode : ALL_TASK_EXECUTION_MODES) {
35                     TaskProperties priority(taskType, vmType, executionMode);
36                     perPropertiesQueue_[priority];
37                 }
38             }
39         }
40     }
41     ~WorkerThreadLocalQueue() = default;
42     NO_COPY_SEMANTIC(WorkerThreadLocalQueue);
43     NO_MOVE_SEMANTIC(WorkerThreadLocalQueue);
44 
RegisterConsumer()45     size_t RegisterConsumer()
46     {
47         os::memory::LockHolder<os::memory::Mutex> lockHolder(registerLock_);
48         size_t id = registerNumber_;
49         for (TaskType taskType : ALL_TASK_TYPES) {
50             for (VMType vmType : ALL_VM_TYPES) {
51                 for (TaskExecutionMode executionMode : ALL_TASK_EXECUTION_MODES) {
52                     TaskProperties priority(taskType, vmType, executionMode);
53                     [[maybe_unused]] auto idInQueue = perPropertiesQueue_.at(priority).RegisterConsumer();
54                     ASSERT(id == idInQueue);
55                 }
56             }
57         }
58         registerNumber_++;
59         return id;
60     }
61 
Push(Task && task)62     void Push(Task &&task)
63     {
64         auto properties = task.GetTaskProperties();
65         ASSERT(!task.IsInvalid());
66         perPropertiesQueue_.at(properties).Push(std::move(task));
67         // Atomic with acq_rel order reason: other threads should be correct value
68         size_.fetch_add(1, std::memory_order_acq_rel);
69     }
70 
Pop(size_t id)71     std::optional<Task> Pop(size_t id)
72     {
73         std::optional<Task> result = Pop(id, TaskExecutionMode::FOREGROUND);
74         if (result.has_value()) {
75             return result;
76         }
77         return Pop(id, TaskExecutionMode::BACKGROUND);
78     }
79 
Pop(size_t id,TaskExecutionMode mode)80     std::optional<Task> Pop(size_t id, TaskExecutionMode mode)
81     {
82         for (TaskType taskType : ALL_TASK_TYPES) {
83             for (VMType vmType : ALL_VM_TYPES) {
84                 TaskProperties prop(taskType, vmType, mode);
85                 auto task = Pop(id, prop);
86                 if (task.has_value()) {
87                     return task;
88                 }
89             }
90         }
91         return std::nullopt;
92     }
93 
Pop(size_t id,TaskProperties priority)94     std::optional<Task> Pop(size_t id, TaskProperties priority)
95     {
96         LocalTaskQueue &queue = perPropertiesQueue_.at(priority);
97         auto task = queue.Pop(id);
98         if (task.has_value()) {
99             // Atomic with acq_rel order reason: other threads should be correct value
100             size_.fetch_sub(1, std::memory_order_acq_rel);
101         }
102         return task;
103     }
104 
TryDeleteRetiredPtrs()105     void TryDeleteRetiredPtrs()
106     {
107         for (TaskType taskType : ALL_TASK_TYPES) {
108             for (VMType vmType : ALL_VM_TYPES) {
109                 perPropertiesQueue_[{taskType, vmType, TaskExecutionMode::BACKGROUND}].TryDeleteRetiredPtrs();
110                 perPropertiesQueue_[{taskType, vmType, TaskExecutionMode::FOREGROUND}].TryDeleteRetiredPtrs();
111             }
112         }
113     }
114 
IsEmpty()115     bool IsEmpty() const
116     {
117         return Size() == 0;
118     }
119 
Size()120     size_t Size() const
121     {
122         // Atomic with acquire order reason: need to load last value
123         return size_.load(std::memory_order_acquire);
124     }
125 
CountOfTasksWithProperties(TaskProperties properties)126     size_t CountOfTasksWithProperties(TaskProperties properties) const
127     {
128         return perPropertiesQueue_.at(properties).Size();
129     }
130 
131 private:
132     using LocalTaskQueue = internal::SPMCLockFreeQueue<Task, WORKER_QUEUE_SIZE>;
133     std::unordered_map<TaskProperties, LocalTaskQueue, TaskProperties::Hash> perPropertiesQueue_;
134     std::atomic_size_t size_ {0};
135 
136     os::memory::Mutex registerLock_;
137     size_t registerNumber_ {0};
138 };
139 
140 }  // namespace ark::taskmanager::internal
141 
142 #endif  // LIBPANDABASE_TASKMANAGER_UTILS_WORKER_THREAD_LOCAL_QUEUE_H
143