• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "base_task_group.h"
17 #include "dp_log.h"
18 
19 namespace OHOS {
20 namespace CameraStandard {
21 namespace DeferredProcessing {
BaseTaskGroup(const std::string & name,TaskFunc func,bool serial,const ThreadPool * threadPool)22 BaseTaskGroup::BaseTaskGroup(const std::string& name, TaskFunc func, bool serial, const ThreadPool* threadPool)
23     : name_(name),
24       func_(std::move(func)),
25       serial_(serial),
26       threadPool_(threadPool),
27       handle_(INVALID_TASK_GROUP_HANDLE),
28       inflight_(false),
29       que_(name + " queue")
30 {
31     DP_DEBUG_LOG("task group (%s).", name_.c_str());
32 }
33 
~BaseTaskGroup()34 BaseTaskGroup::~BaseTaskGroup()
35 {
36     DP_DEBUG_LOG("task group name: %s, handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
37     que_.SetActive(false);
38     que_.Clear();
39 }
40 
Initialize()41 void BaseTaskGroup::Initialize()
42 {
43     handle_ = GenerateHandle();
44     que_.SetActive(true);
45     DP_DEBUG_LOG("task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
46 }
47 
GetName()48 const std::string& BaseTaskGroup::GetName()
49 {
50     return name_;
51 }
52 
GetHandle()53 TaskGroupHandle BaseTaskGroup::GetHandle()
54 {
55     return handle_;
56 }
57 
SubmitTask(std::any param)58 bool BaseTaskGroup::SubmitTask(std::any param)
59 {
60     std::lock_guard<std::mutex> lock(mutex_);
61     if (que_.Full()) {
62         DP_WARNING_LOG("Submit task (%s), handle: %{public}d, que is full!", name_.c_str(), static_cast<int>(handle_));
63     } else {
64         DP_DEBUG_LOG("Submit task (%s), handle: %{public}d, size: %zu.", name_.c_str(),
65             static_cast<int>(handle_), que_.Size());
66     }
67     que_.Push(std::move(param));
68     DispatchTaskUnlocked();
69     return true;
70 }
71 
CancelAllTasks()72 void BaseTaskGroup::CancelAllTasks()
73 {
74     std::lock_guard<std::mutex> lock(mutex_);
75     DP_DEBUG_LOG("Cancel all tasks for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
76     que_.Clear();
77     DP_CHECK_RETURN(!serial_);
78     inflight_ = false;
79 }
80 
GetTaskCount()81 size_t BaseTaskGroup::GetTaskCount()
82 {
83     std::lock_guard<std::mutex> lock(mutex_);
84     DP_DEBUG_LOG("Get task count for task group (%s), handle: %{public}d", name_.c_str(), static_cast<int>(handle_));
85     return que_.Size();
86 }
87 
GetTaskUnlocked()88 std::function<void()> BaseTaskGroup::GetTaskUnlocked()
89 {
90     if (que_.Empty()) {
91         DP_DEBUG_LOG("(%s) no available tasks.", name_.c_str());
92         return {};
93     }
94     std::weak_ptr<BaseTaskGroup> weakThis(shared_from_this());
95     auto task = [param = que_.Pop(), weakThis]() {
96         auto thiz = weakThis.lock();
97         if (thiz) {
98             thiz->func_(std::move(param));
99             thiz->OnTaskComplete();
100         }
101     };
102     DP_DEBUG_LOG("return one task %s, handle:%{public}d, size: %zu.", name_.c_str(), static_cast<int>(handle_),
103         que_.Size());
104     return task;
105 }
106 
GenerateHandle()107 TaskGroupHandle BaseTaskGroup::GenerateHandle()
108 {
109     static std::atomic<uint64_t> counter = 0;
110     DP_DEBUG_LOG("(%s) entered.", name_.c_str());
111     uint64_t prefix = std::hash<std::string>{}(name_);
112     uint64_t handle = ++counter;
113     return prefix | handle;
114 }
115 
DispatchTaskUnlocked()116 void BaseTaskGroup::DispatchTaskUnlocked()
117 {
118     DP_DEBUG_LOG("(%s) entered.", name_.c_str());
119     if (serial_ && inflight_.load()) {
120         DP_DEBUG_LOG("(%s), task is running, redispatch tasks after finish running.", name_.c_str());
121         return;
122     }
123     auto task = GetTaskUnlocked();
124     if (task) {
125         inflight_ = true;
126         threadPool_->Submit([task = std::move(task)] { task(); });
127     } else {
128         DP_DEBUG_LOG("all tasks have completed for %s handle:%{public}d.", name_.c_str(), static_cast<int>(handle_));
129     }
130 }
OnTaskComplete()131 void BaseTaskGroup::OnTaskComplete()
132 {
133     std::lock_guard<std::mutex> lock(mutex_);
134     if (serial_) {
135         inflight_ = false;
136     }
137     DispatchTaskUnlocked();
138 }
139 } //namespace DeferredProcessing
140 } // namespace CameraStandard
141 } // namespace OHOS
142