1 /* 2 * Copyright (c) 2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMON_COMPONENTS_HEAP_COLLECTOR_TASK_QUEUE_H 17 #define COMMON_COMPONENTS_HEAP_COLLECTOR_TASK_QUEUE_H 18 19 #include <condition_variable> 20 #include <cstdint> 21 #include <list> 22 23 #include "common_components/common/page_allocator.h" 24 #include "common_components/heap/collector/gc_request.h" 25 #include "common_components/heap/heap.h" 26 #include "common_components/log/log.h" 27 28 // gc task and task queue implementation 29 namespace common { 30 class GCTask { 31 public: 32 enum class GCTaskType : uint32_t { 33 GC_TASK_INVALID = 0, 34 GC_TASK_TERMINATE_GC = 1, // terminate gc 35 GC_TASK_INVOKE_GC = 2, // invoke gc 36 GC_TASK_DUMP_HEAP = 3, // dump heap 37 GC_TASK_DUMP_HEAP_OOM = 4, // dump heap after oom 38 GC_TASK_DUMP_HEAP_IDE = 5, // dump heap for IDE 39 }; 40 41 enum GCTaskIndex : uint64_t { 42 INVALID_TASK_INDEX = 0, 43 TASK_INDEX_ASYNC_GC = 1, 44 45 // sync task index is among range [TASK_INDEX_SYNC_GC_MIN, TASK_INDEX_SYNC_GC_MAX). 46 TASK_INDEX_SYNC_GC_MIN = 2, 47 TASK_INDEX_SYNC_GC_MAX = std::numeric_limits<uint64_t>::max(), 48 TASK_INDEX_GC_EXIT = TASK_INDEX_SYNC_GC_MAX, 49 }; 50 GCTask(GCTaskType type)51 explicit GCTask(GCTaskType type) : taskType_(type), taskIndex_(TASK_INDEX_ASYNC_GC) {} 52 virtual ~GCTask() = default; 53 GetTaskType()54 GCTaskType GetTaskType() const { return taskType_; } 55 SetTaskType(GCTaskType type)56 void SetTaskType(GCTaskType type) { taskType_ = type; } 57 GetTaskIndex()58 GCTaskIndex GetTaskIndex() const { return taskIndex_; } 59 SetTaskIndex(GCTaskIndex index)60 void SetTaskIndex(GCTaskIndex index) { taskIndex_ = index; } 61 NeedFilter()62 virtual bool NeedFilter() const { return false; } 63 64 virtual bool Execute(void* owner) = 0; 65 66 protected: 67 GCTask(const GCTask& task) = default; 68 virtual GCTask& operator=(const GCTask&) = default; 69 GCTaskType taskType_; 70 GCTaskIndex taskIndex_; 71 }; 72 73 class GCRunner : public GCTask { 74 public: 75 // For a task, we give it a priority based on schedule type and gc reason. 76 // Termination and timeout events get highest prio, and override lower-prio tasks. 77 // Each gc invocation task gets its prio relative to its reason. 78 // This prio is used by the async task queue. 79 static constexpr uint32_t PRIO_TERMINATE = 0; 80 static constexpr uint32_t PRIO_TIMEOUT = 1; 81 static constexpr uint32_t PRIO_INVOKE_GC = 2; 82 83 static_assert(PRIO_INVOKE_GC + static_cast<uint32_t>(GC_REASON_END) < std::numeric_limits<uint32_t>::digits, 84 "task queue reached max capacity"); 85 GCRunner()86 GCRunner() : GCTask(GCTaskType::GC_TASK_INVALID), gcReason_(GC_REASON_INVALID) {} 87 GCRunner(GCTaskType type)88 explicit GCRunner(GCTaskType type) : GCTask(type), gcReason_(GC_REASON_INVALID) 89 { 90 ASSERT_LOGF(type != GCTaskType::GC_TASK_INVOKE_GC, "invalid gc task!"); 91 } 92 93 GCRunner(GCTaskType type, GCReason reason, GCType gcType = GC_TYPE_FULL) GCTask(type)94 : GCTask(type), gcReason_(reason), gcType_(gcType) 95 { 96 ASSERT_LOGF(gcReason_ >= GC_REASON_BEGIN && gcReason_ <= GC_REASON_END, "invalid reason"); 97 ASSERT_LOGF(gcType_ >= GC_TYPE_BEGIN && gcType_ <= GC_TYPE_END, "invalid gc type"); 98 } 99 100 GCRunner(const GCRunner& task) = default; 101 ~GCRunner() override = default; 102 GCRunner& operator=(const GCRunner&) = default; 103 GetGCRunner(uint32_t prio)104 static inline GCRunner GetGCRunner(uint32_t prio) 105 { 106 if (prio == PRIO_TERMINATE) { 107 return GCRunner(GCTaskType::GC_TASK_TERMINATE_GC); 108 } else if (prio - PRIO_INVOKE_GC <= GC_REASON_END) { 109 auto reason = static_cast<GCReason>(prio - PRIO_INVOKE_GC); 110 auto gcType = reason == GC_REASON_YOUNG ? GC_TYPE_YOUNG : GC_TYPE_FULL; 111 return GCRunner(GCTaskType::GC_TASK_INVOKE_GC, reason, gcType); 112 } else { //LCOV_EXCL_BR_LINE 113 LOG_COMMON(FATAL) << "Invalid priority in GetGCRequestByPrio function"; 114 UNREACHABLE_CC(); 115 return GCRunner(); 116 } 117 } 118 GetPriority()119 inline uint32_t GetPriority() const 120 { 121 if (taskType_ == GCTaskType::GC_TASK_TERMINATE_GC) { 122 return PRIO_TERMINATE; 123 } else if (taskType_ == GCTaskType::GC_TASK_INVOKE_GC) { 124 return PRIO_INVOKE_GC + gcReason_; 125 } 126 LOG_COMMON(FATAL) << "Invalid task in GetPriority function"; 127 UNREACHABLE_CC(); 128 return 0; 129 } 130 GetInvalidExecutor()131 static inline GCRunner GetInvalidExecutor() { return GCRunner(); } 132 IsInvalid()133 inline bool IsInvalid() const 134 { 135 return (taskType_ == GCTaskType::GC_TASK_INVALID) && (gcReason_ == GC_REASON_INVALID); 136 } 137 138 // Only for asyn gc task queues, 139 // the TaskType::GC_TASK_TERMINATE_GC gc task will remove all others IsOverriding()140 inline bool IsOverriding() const { return (taskType_ != GCTaskType::GC_TASK_INVOKE_GC); } 141 GetGCReason()142 inline GCReason GetGCReason() const { return gcReason_; } 143 SetGCReason(GCReason reason)144 inline void SetGCReason(GCReason reason) { gcReason_ = reason; } 145 GetGCType()146 inline GCType GetGCType() const { return gcType_; } 147 SetGCType(GCType type)148 inline void SetGCType(GCType type) { gcType_ = type; } 149 NeedFilter()150 bool NeedFilter() const override { return true; } 151 152 bool Execute(void* owner) override; 153 154 private: 155 GCReason gcReason_ { GC_REASON_INVALID }; 156 GCType gcType_ { GC_TYPE_FULL }; 157 }; 158 159 // Lockless async task queue implementation. 160 // This queue manages a list of deduplicated tasks. 161 // Each bit of the queueWord indicates the corresponding priority task. 162 // Lower bit indicates higher priority task. 163 template<typename Type> 164 class GCLocklessTaskQueue { 165 public: 166 // Add one async task to asyncTaskQueue, one higher priority task might erase all lower-priority tasks in queueWord Push(const Type & task)167 void Push(const Type& task) 168 { 169 uint32_t nextWord{ 0 }; 170 bool overriding{ task.IsOverriding() }; 171 uint32_t taskMask{ (1U << task.GetPriority()) }; 172 uint32_t curuentWord{ queueWord_.load(std::memory_order_relaxed) }; 173 do { 174 if (overriding) { 175 nextWord = taskMask | ((taskMask - 1) & curuentWord); 176 } else { 177 nextWord = taskMask | curuentWord; 178 } 179 } while (!queueWord_.compare_exchange_weak(curuentWord, nextWord, std::memory_order_relaxed)); 180 } 181 182 // Get the highest priority task in queueWord 183 // Or get one invalid task if queueWord is empty Pop()184 Type Pop() 185 { 186 uint32_t nextWord{ 0 }; 187 uint32_t currentWord{ queueWord_.load(std::memory_order_relaxed) }; 188 uint32_t dequeued{ currentWord }; 189 do { 190 nextWord = currentWord & (currentWord - 1); 191 dequeued = currentWord; 192 } while (!queueWord_.compare_exchange_weak(currentWord, nextWord, std::memory_order_relaxed)); 193 194 if (currentWord == 0) { 195 return Type::GetInvalidExecutor(); 196 } 197 // get the count of trailing zeros 198 return Type::GetGCRunner(__builtin_ctz(dequeued)); 199 } 200 201 // When gc thread exits, clear all tasks in queueWord Clear()202 void Clear() { queueWord_.store(0, std::memory_order_relaxed); } 203 204 private: 205 std::atomic<uint32_t> queueWord_ = {}; 206 }; 207 208 template<typename Type> 209 class GCTaskQueue { 210 static_assert(std::is_base_of<GCTask, Type>::value, "T is not a subclass of GCTask"); 211 212 public: 213 using GCTaskFilter = std::function<bool(Type& oldTask, Type& newTask)>; 214 using GCTaskQueueType = std::list<Type, StdContainerAllocator<Type, GC_TASK_QUEUE>>; 215 Init()216 void Init() { syncTaskIndex_ = GCTask::TASK_INDEX_SYNC_GC_MIN; } 217 Finish()218 void Finish() 219 { 220 std::lock_guard<std::recursive_mutex> lock(taskQueueLock_); 221 asyncTaskQueue_.Clear(); 222 syncTaskQueue_.clear(); 223 } 224 225 // Add one task to syncTaskQueue 226 // Return the accumulated gc times EnqueueSync(Type & task,GCTaskFilter & filter)227 uint64_t EnqueueSync(Type& task, GCTaskFilter& filter) 228 { 229 std::unique_lock<std::recursive_mutex> lock(taskQueueLock_); 230 GCTaskQueueType& queue = syncTaskQueue_; 231 232 if (!queue.empty() && task.NeedFilter()) { 233 for (auto iter = queue.rbegin(); iter != queue.rend(); ++iter) { 234 if (filter(*iter, task)) { 235 return (*iter).GetTaskIndex(); 236 } 237 } 238 } 239 task.SetTaskIndex(static_cast<GCTask::GCTaskIndex>(++syncTaskIndex_)); 240 queue.push_back(task); 241 taskQueueCondVar_.notify_all(); 242 return task.GetTaskIndex(); 243 } 244 245 // Add one task to asyncTaskQueue EnqueueAsync(const Type & task)246 void EnqueueAsync(const Type& task) 247 { 248 asyncTaskQueue_.Push(task); 249 std::unique_lock<std::recursive_mutex> lock(taskQueueLock_); 250 taskQueueCondVar_.notify_all(); 251 } 252 253 // Retrieve a garbage collection task from the task queue 254 // Prioritize synchronous tasks from syncTaskQueue before asynchronous ones from asyncTaskQueue Dequeue()255 Type Dequeue() 256 { 257 std::chrono::nanoseconds waitTime(DEFAULT_GC_TASK_INTERVAL_TIMEOUT_NS); 258 std::cv_status cvStatus = std::cv_status::no_timeout; 259 while (true) { 260 std::unique_lock<std::recursive_mutex> lock(taskQueueLock_); 261 // Prioritize synchronous task queue first 262 if (!syncTaskQueue_.empty()) { 263 Type currentTask(syncTaskQueue_.front()); 264 syncTaskQueue_.pop_front(); 265 return currentTask; 266 } 267 268 // Retrieve the task and then process data with dfx 269 Type task = asyncTaskQueue_.Pop(); 270 if (task.IsInvalid()) { 271 VLOG(DEBUG, "invalid gc task: type %u, reason %u", task.GetTaskType(), task.GetGCReason()); 272 } else { 273 VLOG(DEBUG, "dequeue gc task: type %u. reason %u", task.GetTaskType(), task.GetGCReason()); 274 return task; 275 } 276 277 cvStatus = taskQueueCondVar_.wait_for(lock, waitTime); 278 } 279 } 280 281 // GC thread poll task queue and execute gc task DrainTaskQueue(void * owner)282 void DrainTaskQueue(void* owner) 283 { 284 while (true) { 285 Type task = Dequeue(); 286 if (!task.Execute(owner)) { 287 Finish(); 288 break; 289 } 290 } 291 } 292 293 private: 294 static constexpr uint64_t DEFAULT_GC_TASK_INTERVAL_TIMEOUT_NS = 1000L * 1000 * 1000; // default 1s 295 std::recursive_mutex taskQueueLock_; 296 std::condition_variable_any taskQueueCondVar_; 297 uint64_t syncTaskIndex_ = 0; 298 GCTaskQueueType syncTaskQueue_; 299 GCLocklessTaskQueue<Type> asyncTaskQueue_; 300 }; 301 } // namespace common 302 303 #endif // COMMON_COMPONENTS_HEAP_COLLECTOR_TASK_QUEUE_H 304