1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "send_task_scheduler.h"
17 #include <algorithm>
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "serial_buffer.h"
21
22 namespace DistributedDB {
23 // In current parameters, the scheduler will hold 160 MB in extreme situation.
24 // In actual runtime situation, the scheduler will hold no more than 100 MB.
25 static constexpr uint32_t MAX_CAPACITY = 67108864; // 64 M bytes
26 static constexpr uint32_t EXTRA_CAPACITY_FOR_NORMAL_PRIORITY = 33554432; // 32 M bytes
27 static constexpr uint32_t EXTRA_CAPACITY_FOR_HIGH_PRIORITY = 67108864; // 64 M bytes
28
~SendTaskScheduler()29 SendTaskScheduler::~SendTaskScheduler()
30 {
31 Finalize();
32 }
33
Initialize()34 void SendTaskScheduler::Initialize()
35 {
36 priorityOrder_.clear();
37 priorityOrder_.push_back(Priority::HIGH);
38 priorityOrder_.push_back(Priority::NORMAL);
39 priorityOrder_.push_back(Priority::LOW);
40 for (const auto &prio : priorityOrder_) {
41 extraCapacityInByteByPrio_[prio] = 0;
42 taskCountByPrio_[prio] = 0;
43 taskDelayCountByPrio_[prio] = 0;
44 taskGroupByPrio_[prio] = TaskListByTarget();
45 }
46 extraCapacityInByteByPrio_[Priority::NORMAL] = EXTRA_CAPACITY_FOR_NORMAL_PRIORITY;
47 extraCapacityInByteByPrio_[Priority::HIGH] = EXTRA_CAPACITY_FOR_HIGH_PRIORITY;
48 }
49
Finalize()50 void SendTaskScheduler::Finalize()
51 {
52 while (GetTotalTaskCount() != 0) {
53 SendTask task;
54 SendTaskInfo taskInfo;
55 int errCode = ScheduleOutSendTask(task, taskInfo);
56 if (errCode != E_OK) {
57 LOGE("[Scheduler][Final] INTERNAL ERROR.");
58 break; // Not possible to happen
59 }
60 LOGW("[Scheduler][Finalize] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", task.dstTarget.c_str(),
61 taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
62 FinalizeLastScheduleTask();
63 }
64 }
65
AddSendTaskIntoSchedule(const SendTask & inTask,Priority inPrio)66 int SendTaskScheduler::AddSendTaskIntoSchedule(const SendTask &inTask, Priority inPrio)
67 {
68 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
69 if (curTotalSizeByByte_ >= MAX_CAPACITY + extraCapacityInByteByPrio_[inPrio]) {
70 return -E_CONTAINER_FULL;
71 }
72
73 uint32_t taskSizeByByte = inTask.buffer->GetSize();
74 curTotalSizeByByte_ += taskSizeByByte;
75 curTotalSizeByTask_++;
76 if (policyMap_.count(inTask.dstTarget) == 0) {
77 policyMap_[inTask.dstTarget] = TargetPolicy::NO_DELAY;
78 }
79 if (policyMap_[inTask.dstTarget] == TargetPolicy::DELAY) {
80 delayTaskCount_++;
81 taskDelayCountByPrio_[inPrio]++;
82 }
83
84 taskCountByPrio_[inPrio]++;
85 taskOrderByPrio_[inPrio].push_back(inTask.dstTarget);
86 taskGroupByPrio_[inPrio][inTask.dstTarget].push_back(inTask);
87 return E_OK;
88 }
89
ScheduleOutSendTask(SendTask & outTask)90 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask)
91 {
92 SendTaskInfo taskInfo;
93 int errCode = ScheduleOutSendTask(outTask, taskInfo);
94 if (errCode == E_OK) {
95 LOGI("[Scheduler][OutTask] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", outTask.dstTarget.c_str(),
96 taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
97 }
98 return errCode;
99 }
100
ScheduleOutSendTask(SendTask & outTask,SendTaskInfo & outTaskInfo)101 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
102 {
103 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
104 if (curTotalSizeByTask_ == 0) {
105 return -E_CONTAINER_EMPTY;
106 }
107
108 if (delayTaskCount_ == curTotalSizeByTask_) {
109 // Tasks are all in delay status
110 int errCode = ScheduleDelayTask(outTask, outTaskInfo);
111 if (errCode == E_OK) {
112 // Update last schedule location
113 lastScheduleTarget_ = outTask.dstTarget;
114 lastSchedulePriority_ = outTaskInfo.taskPrio;
115 scheduledFlag_ = true;
116 }
117 return errCode;
118 } else {
119 // There are some tasks not in delay status
120 int errCode = ScheduleNoDelayTask(outTask, outTaskInfo);
121 if (errCode == E_OK) {
122 // Update last schedule location
123 lastScheduleTarget_ = outTask.dstTarget;
124 lastSchedulePriority_ = outTaskInfo.taskPrio;
125 scheduledFlag_ = true;
126 }
127 return errCode;
128 }
129 }
130
FinalizeLastScheduleTask()131 int SendTaskScheduler::FinalizeLastScheduleTask()
132 {
133 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
134 if (curTotalSizeByTask_ == 0) {
135 return -E_CONTAINER_EMPTY;
136 }
137 if (!scheduledFlag_) {
138 return -E_NOT_PERMIT;
139 }
140
141 // Retrieve last scheduled task
142 SendTask task = taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].front();
143
144 bool isFullBefore = (curTotalSizeByByte_ >= MAX_CAPACITY);
145 uint32_t taskSize = task.buffer->GetSize();
146 curTotalSizeByByte_ -= taskSize;
147 bool isFullAfter = (curTotalSizeByByte_ >= MAX_CAPACITY);
148
149 curTotalSizeByTask_--;
150 taskCountByPrio_[lastSchedulePriority_]--;
151 if (policyMap_[lastScheduleTarget_] == TargetPolicy::DELAY) {
152 delayTaskCount_--;
153 taskDelayCountByPrio_[lastSchedulePriority_]--;
154 }
155
156 for (auto iter = taskOrderByPrio_[lastSchedulePriority_].begin();
157 iter != taskOrderByPrio_[lastSchedulePriority_].end(); ++iter) {
158 if (*iter == lastScheduleTarget_) {
159 taskOrderByPrio_[lastSchedulePriority_].erase(iter);
160 break;
161 }
162 }
163
164 taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].pop_front();
165 delete task.buffer;
166 task.buffer = nullptr;
167 scheduledFlag_ = false;
168
169 if (isFullBefore && !isFullAfter) {
170 return -E_CONTAINER_FULL_TO_NOTFULL;
171 }
172 if (curTotalSizeByTask_ == 0) {
173 return -E_CONTAINER_NOTEMPTY_TO_EMPTY;
174 }
175 if (curTotalSizeByTask_ == delayTaskCount_) {
176 return -E_CONTAINER_ONLY_DELAY_TASK;
177 }
178
179 return E_OK;
180 }
181
DelayTaskByTarget(const std::string & inTarget)182 int SendTaskScheduler::DelayTaskByTarget(const std::string &inTarget)
183 {
184 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
185 if (policyMap_.count(inTarget) == 0) {
186 LOGE("[Scheduler][DelayTask] Not found inTarget=%s{private}", inTarget.c_str());
187 return -E_NOT_FOUND;
188 }
189 if (policyMap_[inTarget] == TargetPolicy::DELAY) {
190 return E_OK;
191 }
192
193 policyMap_[inTarget] = TargetPolicy::DELAY;
194 for (auto &prio : priorityOrder_) {
195 size_t count = taskGroupByPrio_[prio][inTarget].size();
196 taskDelayCountByPrio_[prio] += static_cast<uint32_t>(count);
197 delayTaskCount_ += static_cast<uint32_t>(count);
198 }
199 return E_OK;
200 }
201
NoDelayTaskByTarget(const std::string & inTarget)202 int SendTaskScheduler::NoDelayTaskByTarget(const std::string &inTarget)
203 {
204 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
205 if (policyMap_.count(inTarget) == 0) {
206 LOGE("[Scheduler][NoDelayTask] Not found inTarget=%s{private}", inTarget.c_str());
207 return -E_NOT_FOUND;
208 }
209 if (policyMap_[inTarget] == TargetPolicy::NO_DELAY) {
210 return E_OK;
211 }
212
213 policyMap_[inTarget] = TargetPolicy::NO_DELAY;
214 for (auto &prio : priorityOrder_) {
215 size_t count = taskGroupByPrio_[prio][inTarget].size();
216 // Logic guarantee that former not smaller than latter
217 taskDelayCountByPrio_[prio] -= static_cast<uint32_t>(count);
218 delayTaskCount_ -= static_cast<uint32_t>(count);
219 }
220 return E_OK;
221 }
222
GetTotalTaskCount() const223 uint32_t SendTaskScheduler::GetTotalTaskCount() const
224 {
225 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
226 return curTotalSizeByTask_;
227 }
228
GetNoDelayTaskCount() const229 uint32_t SendTaskScheduler::GetNoDelayTaskCount() const
230 {
231 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
232 // delayTaskCount_ never greater than curTotalSizeByTask_
233 return curTotalSizeByTask_ - delayTaskCount_;
234 }
235
ScheduleDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)236 int SendTaskScheduler::ScheduleDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
237 {
238 for (const auto &prio : priorityOrder_) {
239 if (taskCountByPrio_[prio] == 0) {
240 // No task of this priority
241 continue;
242 }
243 // Logic guarantee that lists access below will not be empty
244 std::string dstTarget = taskOrderByPrio_[prio].front();
245 outTask = taskGroupByPrio_[prio][dstTarget].front();
246 outTaskInfo.delayFlag = true;
247 outTaskInfo.taskPrio = prio;
248 return E_OK;
249 }
250 LOGE("[Scheduler][ScheduleDelay] INTERNAL ERROR : NO TASK.");
251 return -E_INTERNAL_ERROR;
252 }
253
ScheduleNoDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)254 int SendTaskScheduler::ScheduleNoDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
255 {
256 for (const auto &prio : priorityOrder_) {
257 if (taskCountByPrio_[prio] == 0 || taskCountByPrio_[prio] == taskDelayCountByPrio_[prio]) {
258 // No no_delay_task of this priority
259 continue;
260 }
261 // Logic guarantee that lists accessed below will not be empty
262 std::string dstTarget;
263 bool findFlag = false; // Not necessary in fact
264 for (auto iter = taskOrderByPrio_[prio].begin(); iter != taskOrderByPrio_[prio].end(); ++iter) {
265 // Logic guarantee that there is at least one target in orderList that is NO_DELAY
266 dstTarget = *iter;
267 if (policyMap_[dstTarget] == TargetPolicy::NO_DELAY) {
268 findFlag = true;
269 break;
270 }
271 }
272 if (!findFlag) {
273 LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO_DELAY NOT FOUND.");
274 return -E_INTERNAL_ERROR;
275 }
276
277 outTask = taskGroupByPrio_[prio][dstTarget].front();
278 outTaskInfo.delayFlag = false;
279 outTaskInfo.taskPrio = prio;
280 return E_OK;
281 }
282 LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO TASK.");
283 return -E_INTERNAL_ERROR;
284 }
285 }
286