• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef COMMON_COMPONENTS_HEAP_COLLECTOR_TASK_QUEUE_H
17 #define COMMON_COMPONENTS_HEAP_COLLECTOR_TASK_QUEUE_H
18 
19 #include <condition_variable>
20 #include <cstdint>
21 #include <list>
22 
23 #include "common_components/common/page_allocator.h"
24 #include "common_components/heap/collector/gc_request.h"
25 #include "common_components/heap/heap.h"
26 #include "common_components/log/log.h"
27 
28 // gc task and task queue implementation
29 namespace common {
30 class GCTask {
31 public:
32     enum class GCTaskType : uint32_t {
33         GC_TASK_INVALID = 0,
34         GC_TASK_TERMINATE_GC = 1,  // terminate gc
35         GC_TASK_INVOKE_GC = 2,     // invoke gc
36         GC_TASK_DUMP_HEAP = 3,     // dump heap
37         GC_TASK_DUMP_HEAP_OOM = 4, // dump heap after oom
38         GC_TASK_DUMP_HEAP_IDE = 5, // dump heap for IDE
39     };
40 
41     enum GCTaskIndex : uint64_t {
42         INVALID_TASK_INDEX = 0,
43         TASK_INDEX_ASYNC_GC = 1,
44 
45         // sync task index is among range [TASK_INDEX_SYNC_GC_MIN, TASK_INDEX_SYNC_GC_MAX).
46         TASK_INDEX_SYNC_GC_MIN = 2,
47         TASK_INDEX_SYNC_GC_MAX = std::numeric_limits<uint64_t>::max(),
48         TASK_INDEX_GC_EXIT = TASK_INDEX_SYNC_GC_MAX,
49     };
50 
GCTask(GCTaskType type)51     explicit GCTask(GCTaskType type) : taskType_(type), taskIndex_(TASK_INDEX_ASYNC_GC) {}
52     virtual ~GCTask() = default;
53 
GetTaskType()54     GCTaskType GetTaskType() const { return taskType_; }
55 
SetTaskType(GCTaskType type)56     void SetTaskType(GCTaskType type) { taskType_ = type; }
57 
GetTaskIndex()58     GCTaskIndex GetTaskIndex() const { return taskIndex_; }
59 
SetTaskIndex(GCTaskIndex index)60     void SetTaskIndex(GCTaskIndex index) { taskIndex_ = index; }
61 
NeedFilter()62     virtual bool NeedFilter() const { return false; }
63 
64     virtual bool Execute(void* owner) = 0;
65 
66 protected:
67     GCTask(const GCTask& task) = default;
68     virtual GCTask& operator=(const GCTask&) = default;
69     GCTaskType taskType_;
70     GCTaskIndex taskIndex_;
71 };
72 
73 class GCRunner : public GCTask {
74 public:
75     // For a task, we give it a priority based on schedule type and gc reason.
76     // Termination and timeout events get highest prio, and override lower-prio tasks.
77     // Each gc invocation task gets its prio relative to its reason.
78     // This prio is used by the async task queue.
79     static constexpr uint32_t PRIO_TERMINATE = 0;
80     static constexpr uint32_t PRIO_TIMEOUT = 1;
81     static constexpr uint32_t PRIO_INVOKE_GC = 2;
82 
83     static_assert(PRIO_INVOKE_GC + static_cast<uint32_t>(GC_REASON_END) < std::numeric_limits<uint32_t>::digits,
84                   "task queue reached max capacity");
85 
GCRunner()86     GCRunner() : GCTask(GCTaskType::GC_TASK_INVALID), gcReason_(GC_REASON_INVALID) {}
87 
GCRunner(GCTaskType type)88     explicit GCRunner(GCTaskType type) : GCTask(type), gcReason_(GC_REASON_INVALID)
89     {
90         ASSERT_LOGF(type != GCTaskType::GC_TASK_INVOKE_GC, "invalid gc task!");
91     }
92 
93     GCRunner(GCTaskType type, GCReason reason, GCType gcType = GC_TYPE_FULL)
GCTask(type)94         : GCTask(type), gcReason_(reason), gcType_(gcType)
95     {
96         ASSERT_LOGF(gcReason_ >= GC_REASON_BEGIN && gcReason_ <= GC_REASON_END, "invalid reason");
97         ASSERT_LOGF(gcType_ >= GC_TYPE_BEGIN && gcType_ <= GC_TYPE_END, "invalid gc type");
98     }
99 
100     GCRunner(const GCRunner& task) = default;
101     ~GCRunner() override = default;
102     GCRunner& operator=(const GCRunner&) = default;
103 
GetGCRunner(uint32_t prio)104     static inline GCRunner GetGCRunner(uint32_t prio)
105     {
106         if (prio == PRIO_TERMINATE) {
107             return GCRunner(GCTaskType::GC_TASK_TERMINATE_GC);
108         } else if (prio - PRIO_INVOKE_GC <= GC_REASON_END) {
109             auto reason = static_cast<GCReason>(prio - PRIO_INVOKE_GC);
110             auto gcType = reason == GC_REASON_YOUNG ? GC_TYPE_YOUNG : GC_TYPE_FULL;
111             return GCRunner(GCTaskType::GC_TASK_INVOKE_GC, reason, gcType);
112         } else { //LCOV_EXCL_BR_LINE
113             LOG_COMMON(FATAL) << "Invalid priority in GetGCRequestByPrio function";
114             UNREACHABLE_CC();
115             return GCRunner();
116         }
117     }
118 
GetPriority()119     inline uint32_t GetPriority() const
120     {
121         if (taskType_ == GCTaskType::GC_TASK_TERMINATE_GC) {
122             return PRIO_TERMINATE;
123         } else if (taskType_ == GCTaskType::GC_TASK_INVOKE_GC) {
124             return PRIO_INVOKE_GC + gcReason_;
125         }
126         LOG_COMMON(FATAL) << "Invalid task in GetPriority function";
127         UNREACHABLE_CC();
128         return 0;
129     }
130 
GetInvalidExecutor()131     static inline GCRunner GetInvalidExecutor() { return GCRunner(); }
132 
IsInvalid()133     inline bool IsInvalid() const
134     {
135         return (taskType_ == GCTaskType::GC_TASK_INVALID) && (gcReason_ == GC_REASON_INVALID);
136     }
137 
138     // Only for asyn gc task queues,
139     // the TaskType::GC_TASK_TERMINATE_GC gc task will remove all others
IsOverriding()140     inline bool IsOverriding() const { return (taskType_ != GCTaskType::GC_TASK_INVOKE_GC); }
141 
GetGCReason()142     inline GCReason GetGCReason() const { return gcReason_; }
143 
SetGCReason(GCReason reason)144     inline void SetGCReason(GCReason reason) { gcReason_ = reason; }
145 
GetGCType()146     inline GCType GetGCType() const { return gcType_; }
147 
SetGCType(GCType type)148     inline void SetGCType(GCType type) { gcType_ = type; }
149 
NeedFilter()150     bool NeedFilter() const override { return true; }
151 
152     bool Execute(void* owner) override;
153 
154 private:
155     GCReason gcReason_ { GC_REASON_INVALID };
156     GCType gcType_ { GC_TYPE_FULL };
157 };
158 
159 // Lockless async task queue implementation.
160 // This queue manages a list of deduplicated tasks.
161 // Each bit of the queueWord indicates the corresponding priority task.
162 // Lower bit indicates higher priority task.
163 template<typename Type>
164 class GCLocklessTaskQueue {
165 public:
166     // Add one async task to asyncTaskQueue, one higher priority task might erase all lower-priority tasks in queueWord
Push(const Type & task)167     void Push(const Type& task)
168     {
169         uint32_t nextWord{ 0 };
170         bool overriding{ task.IsOverriding() };
171         uint32_t taskMask{ (1U << task.GetPriority()) };
172         uint32_t curuentWord{ queueWord_.load(std::memory_order_relaxed) };
173         do {
174             if (overriding) {
175                 nextWord = taskMask | ((taskMask - 1) & curuentWord);
176             } else {
177                 nextWord = taskMask | curuentWord;
178             }
179         } while (!queueWord_.compare_exchange_weak(curuentWord, nextWord, std::memory_order_relaxed));
180     }
181 
182     // Get the highest priority task in queueWord
183     // Or get one invalid task if queueWord is empty
Pop()184     Type Pop()
185     {
186         uint32_t nextWord{ 0 };
187         uint32_t currentWord{ queueWord_.load(std::memory_order_relaxed) };
188         uint32_t dequeued{ currentWord };
189         do {
190             nextWord = currentWord & (currentWord - 1);
191             dequeued = currentWord;
192         } while (!queueWord_.compare_exchange_weak(currentWord, nextWord, std::memory_order_relaxed));
193 
194         if (currentWord == 0) {
195             return Type::GetInvalidExecutor();
196         }
197         // get the count of trailing zeros
198         return Type::GetGCRunner(__builtin_ctz(dequeued));
199     }
200 
201     // When gc thread exits, clear all tasks in queueWord
Clear()202     void Clear() { queueWord_.store(0, std::memory_order_relaxed); }
203 
204 private:
205     std::atomic<uint32_t> queueWord_ = {};
206 };
207 
208 template<typename Type>
209 class GCTaskQueue {
210     static_assert(std::is_base_of<GCTask, Type>::value, "T is not a subclass of GCTask");
211 
212 public:
213     using GCTaskFilter = std::function<bool(Type& oldTask, Type& newTask)>;
214     using GCTaskQueueType = std::list<Type, StdContainerAllocator<Type, GC_TASK_QUEUE>>;
215 
Init()216     void Init() { syncTaskIndex_ = GCTask::TASK_INDEX_SYNC_GC_MIN; }
217 
Finish()218     void Finish()
219     {
220         std::lock_guard<std::recursive_mutex> lock(taskQueueLock_);
221         asyncTaskQueue_.Clear();
222         syncTaskQueue_.clear();
223     }
224 
225     // Add one task to syncTaskQueue
226     // Return the accumulated gc times
EnqueueSync(Type & task,GCTaskFilter & filter)227     uint64_t EnqueueSync(Type& task, GCTaskFilter& filter)
228     {
229         std::unique_lock<std::recursive_mutex> lock(taskQueueLock_);
230         GCTaskQueueType& queue = syncTaskQueue_;
231 
232         if (!queue.empty() && task.NeedFilter()) {
233             for (auto iter = queue.rbegin(); iter != queue.rend(); ++iter) {
234                 if (filter(*iter, task)) {
235                     return (*iter).GetTaskIndex();
236                 }
237             }
238         }
239         task.SetTaskIndex(static_cast<GCTask::GCTaskIndex>(++syncTaskIndex_));
240         queue.push_back(task);
241         taskQueueCondVar_.notify_all();
242         return task.GetTaskIndex();
243     }
244 
245     // Add one task to asyncTaskQueue
EnqueueAsync(const Type & task)246     void EnqueueAsync(const Type& task)
247     {
248         asyncTaskQueue_.Push(task);
249         std::unique_lock<std::recursive_mutex> lock(taskQueueLock_);
250         taskQueueCondVar_.notify_all();
251     }
252 
253     // Retrieve a garbage collection task from the task queue
254     // Prioritize synchronous tasks from syncTaskQueue before asynchronous ones from asyncTaskQueue
Dequeue()255     Type Dequeue()
256     {
257         std::chrono::nanoseconds waitTime(DEFAULT_GC_TASK_INTERVAL_TIMEOUT_NS);
258         std::cv_status cvStatus = std::cv_status::no_timeout;
259         while (true) {
260             std::unique_lock<std::recursive_mutex> lock(taskQueueLock_);
261             // Prioritize synchronous task queue first
262             if (!syncTaskQueue_.empty()) {
263                 Type currentTask(syncTaskQueue_.front());
264                 syncTaskQueue_.pop_front();
265                 return currentTask;
266             }
267 
268             // Retrieve the task and then process data with dfx
269             Type task = asyncTaskQueue_.Pop();
270             if (task.IsInvalid()) {
271                 VLOG(DEBUG, "invalid gc task: type %u, reason %u", task.GetTaskType(), task.GetGCReason());
272             } else {
273                 VLOG(DEBUG, "dequeue gc task: type %u. reason %u", task.GetTaskType(), task.GetGCReason());
274                 return task;
275             }
276 
277             cvStatus = taskQueueCondVar_.wait_for(lock, waitTime);
278         }
279     }
280 
281     // GC thread poll task queue and execute gc task
DrainTaskQueue(void * owner)282     void DrainTaskQueue(void* owner)
283     {
284         while (true) {
285             Type task = Dequeue();
286             if (!task.Execute(owner)) {
287                 Finish();
288                 break;
289             }
290         }
291     }
292 
293 private:
294     static constexpr uint64_t DEFAULT_GC_TASK_INTERVAL_TIMEOUT_NS = 1000L * 1000 * 1000; // default 1s
295     std::recursive_mutex taskQueueLock_;
296     std::condition_variable_any taskQueueCondVar_;
297     uint64_t syncTaskIndex_ = 0;
298     GCTaskQueueType syncTaskQueue_;
299     GCLocklessTaskQueue<Type> asyncTaskQueue_;
300 };
301 } // namespace common
302 
303 #endif  // COMMON_COMPONENTS_HEAP_COLLECTOR_TASK_QUEUE_H
304