1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "base_task_group.h"
17 #include "dp_log.h"
18
19 namespace OHOS {
20 namespace CameraStandard {
21 namespace DeferredProcessing {
BaseTaskGroup(const std::string & name,TaskFunc func,bool serial,const ThreadPool * threadPool)22 BaseTaskGroup::BaseTaskGroup(const std::string& name, TaskFunc func, bool serial, const ThreadPool* threadPool)
23 : name_(name),
24 func_(std::move(func)),
25 serial_(serial),
26 threadPool_(threadPool),
27 handle_(INVALID_TASK_GROUP_HANDLE),
28 inflight_(false),
29 que_(name + " queue")
30 {
31 DP_DEBUG_LOG("task group (%s).", name_.c_str());
32 }
33
~BaseTaskGroup()34 BaseTaskGroup::~BaseTaskGroup()
35 {
36 DP_DEBUG_LOG("task group name: %s, handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
37 que_.SetActive(false);
38 que_.Clear();
39 }
40
Initialize()41 void BaseTaskGroup::Initialize()
42 {
43 handle_ = GenerateHandle();
44 que_.SetActive(true);
45 DP_DEBUG_LOG("task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
46 }
47
GetName()48 const std::string& BaseTaskGroup::GetName()
49 {
50 return name_;
51 }
52
GetHandle()53 TaskGroupHandle BaseTaskGroup::GetHandle()
54 {
55 return handle_;
56 }
57
SubmitTask(std::any param)58 bool BaseTaskGroup::SubmitTask(std::any param)
59 {
60 std::lock_guard<std::mutex> lock(mutex_);
61 if (que_.Full()) {
62 DP_WARNING_LOG("Submit task (%s), handle: %{public}d, que is full!", name_.c_str(), static_cast<int>(handle_));
63 } else {
64 DP_DEBUG_LOG("Submit task (%s), handle: %{public}d, size: %zu.", name_.c_str(),
65 static_cast<int>(handle_), que_.Size());
66 }
67 que_.Push(std::move(param));
68 DispatchTaskUnlocked();
69 return true;
70 }
71
CancelAllTasks()72 void BaseTaskGroup::CancelAllTasks()
73 {
74 std::lock_guard<std::mutex> lock(mutex_);
75 DP_DEBUG_LOG("Cancel all tasks for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
76 que_.Clear();
77 DP_CHECK_RETURN(!serial_);
78 inflight_ = false;
79 }
80
GetTaskCount()81 size_t BaseTaskGroup::GetTaskCount()
82 {
83 std::lock_guard<std::mutex> lock(mutex_);
84 DP_DEBUG_LOG("Get task count for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
85 return que_.Size();
86 }
87
GetTaskUnlocked()88 std::function<void()> BaseTaskGroup::GetTaskUnlocked()
89 {
90 if (que_.Empty()) {
91 DP_DEBUG_LOG("(%s) no available tasks.", name_.c_str());
92 return {};
93 }
94 std::weak_ptr<BaseTaskGroup> weakThis(shared_from_this());
95 auto task = [param = que_.Pop(), weakThis]() {
96 auto thiz = weakThis.lock();
97 if (thiz) {
98 thiz->func_(std::move(param));
99 thiz->OnTaskComplete();
100 }
101 };
102 DP_DEBUG_LOG("return one task %s, handle:%{public}d, size: %zu.", name_.c_str(), static_cast<int>(handle_),
103 que_.Size());
104 return task;
105 }
106
GenerateHandle()107 TaskGroupHandle BaseTaskGroup::GenerateHandle()
108 {
109 static std::atomic<uint64_t> counter = 0;
110 DP_DEBUG_LOG("(%s) entered.", name_.c_str());
111 uint64_t prefix = std::hash<std::string>{}(name_);
112 uint64_t handle = ++counter;
113 return prefix | handle;
114 }
115
DispatchTaskUnlocked()116 void BaseTaskGroup::DispatchTaskUnlocked()
117 {
118 DP_DEBUG_LOG("(%s) entered.", name_.c_str());
119 if (serial_ && inflight_.load()) {
120 DP_DEBUG_LOG("(%s), task is running, redispatch tasks after finish running.", name_.c_str());
121 return;
122 }
123 auto task = GetTaskUnlocked();
124 if (task) {
125 inflight_ = true;
126 threadPool_->Submit([task = std::move(task)] { task(); });
127 } else {
128 DP_DEBUG_LOG("all tasks have completed for %s handle:%{public}d.", name_.c_str(), static_cast<int>(handle_));
129 }
130 }
OnTaskComplete()131 void BaseTaskGroup::OnTaskComplete()
132 {
133 std::lock_guard<std::mutex> lock(mutex_);
134 if (serial_) {
135 inflight_ = false;
136 }
137 DispatchTaskUnlocked();
138 }
139 } //namespace DeferredProcessing
140 } // namespace CameraStandard
141 } // namespace OHOS
142