• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "send_task_scheduler.h"
17 #include <algorithm>
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "serial_buffer.h"
21 
22 namespace DistributedDB {
23 // In current parameters, the scheduler will hold 160 MB in extreme situation.
24 // In actual runtime situation, the scheduler will hold no more than 100 MB.
25 static constexpr uint32_t MAX_CAPACITY = 67108864; // 64 M bytes
26 static constexpr uint32_t EXTRA_CAPACITY_FOR_NORMAL_PRIORITY = 33554432; // 32 M bytes
27 static constexpr uint32_t EXTRA_CAPACITY_FOR_HIGH_PRIORITY = 67108864; // 64 M bytes
28 
~SendTaskScheduler()29 SendTaskScheduler::~SendTaskScheduler()
30 {
31     Finalize();
32 }
33 
Initialize()34 void SendTaskScheduler::Initialize()
35 {
36     priorityOrder_.clear();
37     priorityOrder_.push_back(Priority::HIGH);
38     priorityOrder_.push_back(Priority::NORMAL);
39     priorityOrder_.push_back(Priority::LOW);
40     for (const auto &prio : priorityOrder_) {
41         extraCapacityInByteByPrio_[prio] = 0;
42         taskCountByPrio_[prio] = 0;
43         taskDelayCountByPrio_[prio] = 0;
44         taskGroupByPrio_[prio] = TaskListByTarget();
45     }
46     extraCapacityInByteByPrio_[Priority::NORMAL] = EXTRA_CAPACITY_FOR_NORMAL_PRIORITY;
47     extraCapacityInByteByPrio_[Priority::HIGH] = EXTRA_CAPACITY_FOR_HIGH_PRIORITY;
48 }
49 
Finalize()50 void SendTaskScheduler::Finalize()
51 {
52     while (GetTotalTaskCount() != 0) {
53         SendTask task;
54         SendTaskInfo taskInfo;
55         int errCode = ScheduleOutSendTask(task, taskInfo);
56         if (errCode != E_OK) {
57             LOGE("[Scheduler][Final] INTERNAL ERROR.");
58             break; // Not possible to happen
59         }
60         LOGW("[Scheduler][Finalize] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", task.dstTarget.c_str(),
61             taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
62         FinalizeLastScheduleTask();
63     }
64 }
65 
AddSendTaskIntoSchedule(const SendTask & inTask,Priority inPrio)66 int SendTaskScheduler::AddSendTaskIntoSchedule(const SendTask &inTask, Priority inPrio)
67 {
68     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
69     if (curTotalSizeByByte_ >= MAX_CAPACITY + extraCapacityInByteByPrio_[inPrio]) {
70         return -E_CONTAINER_FULL;
71     }
72 
73     uint32_t taskSizeByByte = inTask.buffer->GetSize();
74     curTotalSizeByByte_ += taskSizeByByte;
75     curTotalSizeByTask_++;
76     if (policyMap_.count(inTask.dstTarget) == 0) {
77         policyMap_[inTask.dstTarget] = TargetPolicy::NO_DELAY;
78     }
79     if (policyMap_[inTask.dstTarget] == TargetPolicy::DELAY) {
80         delayTaskCount_++;
81         taskDelayCountByPrio_[inPrio]++;
82     }
83 
84     taskCountByPrio_[inPrio]++;
85     taskOrderByPrio_[inPrio].push_back(inTask.dstTarget);
86     taskGroupByPrio_[inPrio][inTask.dstTarget].push_back(inTask);
87     return E_OK;
88 }
89 
ScheduleOutSendTask(SendTask & outTask)90 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask)
91 {
92     SendTaskInfo taskInfo;
93     int errCode = ScheduleOutSendTask(outTask, taskInfo);
94     if (errCode == E_OK) {
95         LOGI("[Scheduler][OutTask] dstTarget=%s{private}, delayFlag=%d, taskPrio=%d", outTask.dstTarget.c_str(),
96             taskInfo.delayFlag, static_cast<int>(taskInfo.taskPrio));
97     }
98     return errCode;
99 }
100 
ScheduleOutSendTask(SendTask & outTask,SendTaskInfo & outTaskInfo)101 int SendTaskScheduler::ScheduleOutSendTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
102 {
103     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
104     if (curTotalSizeByTask_ == 0) {
105         return -E_CONTAINER_EMPTY;
106     }
107 
108     if (delayTaskCount_ == curTotalSizeByTask_) {
109         // Tasks are all in delay status
110         int errCode = ScheduleDelayTask(outTask, outTaskInfo);
111         if (errCode == E_OK) {
112             // Update last schedule location
113             lastScheduleTarget_ = outTask.dstTarget;
114             lastSchedulePriority_ = outTaskInfo.taskPrio;
115             scheduledFlag_ = true;
116         }
117         return errCode;
118     } else {
119         // There are some tasks not in delay status
120         int errCode = ScheduleNoDelayTask(outTask, outTaskInfo);
121         if (errCode == E_OK) {
122             // Update last schedule location
123             lastScheduleTarget_ = outTask.dstTarget;
124             lastSchedulePriority_ = outTaskInfo.taskPrio;
125             scheduledFlag_ = true;
126         }
127         return errCode;
128     }
129 }
130 
FinalizeLastScheduleTask()131 int SendTaskScheduler::FinalizeLastScheduleTask()
132 {
133     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
134     if (curTotalSizeByTask_ == 0) {
135         return -E_CONTAINER_EMPTY;
136     }
137     if (!scheduledFlag_) {
138         return -E_NOT_PERMIT;
139     }
140 
141     // Retrieve last scheduled task
142     SendTask task = taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].front();
143 
144     bool isFullBefore = (curTotalSizeByByte_ >= MAX_CAPACITY);
145     uint32_t taskSize = task.buffer->GetSize();
146     curTotalSizeByByte_ -= taskSize;
147     bool isFullAfter = (curTotalSizeByByte_ >= MAX_CAPACITY);
148 
149     curTotalSizeByTask_--;
150     taskCountByPrio_[lastSchedulePriority_]--;
151     if (policyMap_[lastScheduleTarget_] == TargetPolicy::DELAY) {
152         delayTaskCount_--;
153         taskDelayCountByPrio_[lastSchedulePriority_]--;
154     }
155 
156     for (auto iter = taskOrderByPrio_[lastSchedulePriority_].begin();
157         iter != taskOrderByPrio_[lastSchedulePriority_].end(); ++iter) {
158         if (*iter == lastScheduleTarget_) {
159             taskOrderByPrio_[lastSchedulePriority_].erase(iter);
160             break;
161         }
162     }
163 
164     taskGroupByPrio_[lastSchedulePriority_][lastScheduleTarget_].pop_front();
165     delete task.buffer;
166     task.buffer = nullptr;
167     scheduledFlag_ = false;
168 
169     if (isFullBefore && !isFullAfter) {
170         return -E_CONTAINER_FULL_TO_NOTFULL;
171     }
172     if (curTotalSizeByTask_ == 0) {
173         return -E_CONTAINER_NOTEMPTY_TO_EMPTY;
174     }
175     if (curTotalSizeByTask_ == delayTaskCount_) {
176         return -E_CONTAINER_ONLY_DELAY_TASK;
177     }
178 
179     return E_OK;
180 }
181 
DelayTaskByTarget(const std::string & inTarget)182 int SendTaskScheduler::DelayTaskByTarget(const std::string &inTarget)
183 {
184     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
185     if (policyMap_.count(inTarget) == 0) {
186         LOGE("[Scheduler][DelayTask] Not found inTarget=%s{private}", inTarget.c_str());
187         return -E_NOT_FOUND;
188     }
189     if (policyMap_[inTarget] == TargetPolicy::DELAY) {
190         return E_OK;
191     }
192 
193     policyMap_[inTarget] = TargetPolicy::DELAY;
194     for (auto &prio : priorityOrder_) {
195         size_t count = taskGroupByPrio_[prio][inTarget].size();
196         taskDelayCountByPrio_[prio] += static_cast<uint32_t>(count);
197         delayTaskCount_ += static_cast<uint32_t>(count);
198     }
199     return E_OK;
200 }
201 
NoDelayTaskByTarget(const std::string & inTarget)202 int SendTaskScheduler::NoDelayTaskByTarget(const std::string &inTarget)
203 {
204     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
205     if (policyMap_.count(inTarget) == 0) {
206         LOGE("[Scheduler][NoDelayTask] Not found inTarget=%s{private}", inTarget.c_str());
207         return -E_NOT_FOUND;
208     }
209     if (policyMap_[inTarget] == TargetPolicy::NO_DELAY) {
210         return E_OK;
211     }
212 
213     policyMap_[inTarget] = TargetPolicy::NO_DELAY;
214     for (auto &prio : priorityOrder_) {
215         size_t count = taskGroupByPrio_[prio][inTarget].size();
216         // Logic guarantee that former not smaller than latter
217         taskDelayCountByPrio_[prio] -= static_cast<uint32_t>(count);
218         delayTaskCount_ -= static_cast<uint32_t>(count);
219     }
220     return E_OK;
221 }
222 
GetTotalTaskCount() const223 uint32_t SendTaskScheduler::GetTotalTaskCount() const
224 {
225     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
226     return curTotalSizeByTask_;
227 }
228 
GetNoDelayTaskCount() const229 uint32_t SendTaskScheduler::GetNoDelayTaskCount() const
230 {
231     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
232     // delayTaskCount_ never greater than curTotalSizeByTask_
233     return curTotalSizeByTask_ - delayTaskCount_;
234 }
235 
ScheduleDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)236 int SendTaskScheduler::ScheduleDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
237 {
238     for (const auto &prio : priorityOrder_) {
239         if (taskCountByPrio_[prio] == 0) {
240             // No task of this priority
241             continue;
242         }
243         // Logic guarantee that lists access below will not be empty
244         std::string dstTarget = taskOrderByPrio_[prio].front();
245         outTask = taskGroupByPrio_[prio][dstTarget].front();
246         outTaskInfo.delayFlag = true;
247         outTaskInfo.taskPrio = prio;
248         return E_OK;
249     }
250     LOGE("[Scheduler][ScheduleDelay] INTERNAL ERROR : NO TASK.");
251     return -E_INTERNAL_ERROR;
252 }
253 
ScheduleNoDelayTask(SendTask & outTask,SendTaskInfo & outTaskInfo)254 int SendTaskScheduler::ScheduleNoDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo)
255 {
256     for (const auto &prio : priorityOrder_) {
257         if (taskCountByPrio_[prio] == 0 || taskCountByPrio_[prio] == taskDelayCountByPrio_[prio]) {
258             // No no_delay_task of this priority
259             continue;
260         }
261         // Logic guarantee that lists accessed below will not be empty
262         std::string dstTarget;
263         bool findFlag = false; // Not necessary in fact
264         for (auto iter = taskOrderByPrio_[prio].begin(); iter != taskOrderByPrio_[prio].end(); ++iter) {
265             // Logic guarantee that there is at least one target in orderList that is NO_DELAY
266             dstTarget = *iter;
267             if (policyMap_[dstTarget] == TargetPolicy::NO_DELAY) {
268                 findFlag = true;
269                 break;
270             }
271         }
272         if (!findFlag) {
273             LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO_DELAY NOT FOUND.");
274             return -E_INTERNAL_ERROR;
275         }
276 
277         outTask = taskGroupByPrio_[prio][dstTarget].front();
278         outTaskInfo.delayFlag = false;
279         outTaskInfo.taskPrio = prio;
280         return E_OK;
281     }
282     LOGE("[Scheduler][ScheduleNoDelay] INTERNAL ERROR : NO TASK.");
283     return -E_INTERNAL_ERROR;
284 }
285 }
286