1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef LIBPANDABASE_TASKMANAGER_UTILS_WORKER_THREAD_LOCAL_QUEUE_H 17 #define LIBPANDABASE_TASKMANAGER_UTILS_WORKER_THREAD_LOCAL_QUEUE_H 18 19 #include "libpandabase/taskmanager/task.h" 20 #include "libpandabase/taskmanager/utils/sp_mc_lock_free_queue.h" 21 #include "libpandabase/os/mutex.h" 22 #include <optional> 23 #include <unordered_map> 24 25 namespace ark::taskmanager::internal { 26 27 template <size_t WORKER_QUEUE_SIZE> 28 class WorkerThreadLocalQueue { 29 public: WorkerThreadLocalQueue()30 WorkerThreadLocalQueue() 31 { 32 for (TaskType taskType : ALL_TASK_TYPES) { 33 for (VMType vmType : ALL_VM_TYPES) { 34 for (TaskExecutionMode executionMode : ALL_TASK_EXECUTION_MODES) { 35 TaskProperties priority(taskType, vmType, executionMode); 36 perPropertiesQueue_[priority]; 37 } 38 } 39 } 40 } 41 ~WorkerThreadLocalQueue() = default; 42 NO_COPY_SEMANTIC(WorkerThreadLocalQueue); 43 NO_MOVE_SEMANTIC(WorkerThreadLocalQueue); 44 RegisterConsumer()45 size_t RegisterConsumer() 46 { 47 os::memory::LockHolder<os::memory::Mutex> lockHolder(registerLock_); 48 size_t id = registerNumber_; 49 for (TaskType taskType : ALL_TASK_TYPES) { 50 for (VMType vmType : ALL_VM_TYPES) { 51 for (TaskExecutionMode executionMode : ALL_TASK_EXECUTION_MODES) { 52 TaskProperties priority(taskType, vmType, executionMode); 53 [[maybe_unused]] auto idInQueue = perPropertiesQueue_.at(priority).RegisterConsumer(); 54 ASSERT(id == idInQueue); 55 } 56 } 57 } 58 registerNumber_++; 59 return id; 60 } 61 Push(Task && task)62 void Push(Task &&task) 63 { 64 auto properties = task.GetTaskProperties(); 65 ASSERT(!task.IsInvalid()); 66 perPropertiesQueue_.at(properties).Push(std::move(task)); 67 // Atomic with acq_rel order reason: other threads should be correct value 68 size_.fetch_add(1, std::memory_order_acq_rel); 69 } 70 Pop(size_t id)71 std::optional<Task> Pop(size_t id) 72 { 73 std::optional<Task> result = Pop(id, TaskExecutionMode::FOREGROUND); 74 if (result.has_value()) { 75 return result; 76 } 77 return Pop(id, TaskExecutionMode::BACKGROUND); 78 } 79 Pop(size_t id,TaskExecutionMode mode)80 std::optional<Task> Pop(size_t id, TaskExecutionMode mode) 81 { 82 for (TaskType taskType : ALL_TASK_TYPES) { 83 for (VMType vmType : ALL_VM_TYPES) { 84 TaskProperties prop(taskType, vmType, mode); 85 auto task = Pop(id, prop); 86 if (task.has_value()) { 87 return task; 88 } 89 } 90 } 91 return std::nullopt; 92 } 93 Pop(size_t id,TaskProperties priority)94 std::optional<Task> Pop(size_t id, TaskProperties priority) 95 { 96 LocalTaskQueue &queue = perPropertiesQueue_.at(priority); 97 auto task = queue.Pop(id); 98 if (task.has_value()) { 99 // Atomic with acq_rel order reason: other threads should be correct value 100 size_.fetch_sub(1, std::memory_order_acq_rel); 101 } 102 return task; 103 } 104 TryDeleteRetiredPtrs()105 void TryDeleteRetiredPtrs() 106 { 107 for (TaskType taskType : ALL_TASK_TYPES) { 108 for (VMType vmType : ALL_VM_TYPES) { 109 perPropertiesQueue_[{taskType, vmType, TaskExecutionMode::BACKGROUND}].TryDeleteRetiredPtrs(); 110 perPropertiesQueue_[{taskType, vmType, TaskExecutionMode::FOREGROUND}].TryDeleteRetiredPtrs(); 111 } 112 } 113 } 114 IsEmpty()115 bool IsEmpty() const 116 { 117 return Size() == 0; 118 } 119 Size()120 size_t Size() const 121 { 122 // Atomic with acquire order reason: need to load last value 123 return size_.load(std::memory_order_acquire); 124 } 125 CountOfTasksWithProperties(TaskProperties properties)126 size_t CountOfTasksWithProperties(TaskProperties properties) const 127 { 128 return perPropertiesQueue_.at(properties).Size(); 129 } 130 131 private: 132 using LocalTaskQueue = internal::SPMCLockFreeQueue<Task, WORKER_QUEUE_SIZE>; 133 std::unordered_map<TaskProperties, LocalTaskQueue, TaskProperties::Hash> perPropertiesQueue_; 134 std::atomic_size_t size_ {0}; 135 136 os::memory::Mutex registerLock_; 137 size_t registerNumber_ {0}; 138 }; 139 140 } // namespace ark::taskmanager::internal 141 142 #endif // LIBPANDABASE_TASKMANAGER_UTILS_WORKER_THREAD_LOCAL_QUEUE_H 143