1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SEND_TASK_SCHEDULER_H 17 #define SEND_TASK_SCHEDULER_H 18 19 #include <cstdint> 20 #include <list> 21 #include <map> 22 #include <mutex> 23 #include <string> 24 #include <vector> 25 #include "communicator_type_define.h" 26 #include "iprocess_communicator.h" 27 #include "macro_utils.h" 28 29 namespace DistributedDB { 30 enum class TargetPolicy { 31 NO_DELAY = 0, 32 DELAY = 1, 33 }; 34 35 class SerialBuffer; // Forward Declaration 36 37 struct SendTask { 38 SerialBuffer *buffer = nullptr; 39 std::string dstTarget; 40 OnSendEnd onEnd; 41 uint32_t frameId = 0u; 42 bool isValid = true; 43 bool isRetryTask = true; 44 AccessInfos infos; 45 }; 46 47 struct SendTaskInfo { 48 bool delayFlag = false; 49 Priority taskPrio = Priority::LOW; 50 }; 51 52 using TaskListByTarget = std::map<std::string, std::list<SendTask>>; 53 54 class SendTaskScheduler { 55 public: 56 SendTaskScheduler() = default; // Default constructor must be explicitly provided due to DISABLE_COPY_ASSIGN_MOVE 57 ~SendTaskScheduler(); 58 59 DISABLE_COPY_ASSIGN_MOVE(SendTaskScheduler); 60 61 void Initialize(); 62 63 // This method for consumer 64 void Finalize(); 65 66 // This method for producer, support multiple thread 67 int AddSendTaskIntoSchedule(const SendTask &inTask, Priority inPrio); 68 69 // This method for consumer, not recommend for multiple thread 70 int ScheduleOutSendTask(SendTask &outTask, uint32_t &totalLength); 71 int ScheduleOutSendTask(SendTask &outTask, SendTaskInfo &outTaskInfo, uint32_t &totalLength); 72 73 // This method for consumer, call ScheduleOutSendTask at least one time before each calling this 74 int FinalizeLastScheduleTask(); 75 76 // These two mothods influence the task that will be schedule out next time 77 int DelayTaskByTarget(const std::string &inTarget); 78 int NoDelayTaskByTarget(const std::string &inTarget); 79 80 uint32_t GetTotalTaskCount() const; 81 uint32_t GetNoDelayTaskCount() const; 82 83 void InvalidSendTask(const std::string &target); 84 void SetDeviceCommErrCode(const std::string &target, int deviceCommErrCode); 85 86 private: 87 int ScheduleDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo); 88 int ScheduleNoDelayTask(SendTask &outTask, SendTaskInfo &outTaskInfo); 89 90 mutable std::mutex overallMutex_; 91 uint32_t curTotalSizeByByte_ = 0; 92 uint32_t curTotalSizeByTask_ = 0; 93 uint32_t delayTaskCount_ = 0; 94 95 std::vector<Priority> priorityOrder_; 96 std::map<Priority, uint32_t> extraCapacityInByteByPrio_; 97 std::map<std::string, TargetPolicy> policyMap_; 98 std::map<std::string, uint32_t> totalBytesByTarget_; 99 100 std::map<Priority, uint32_t> taskCountByPrio_; 101 std::map<Priority, uint32_t> taskDelayCountByPrio_; 102 std::map<Priority, std::list<std::string>> taskOrderByPrio_; 103 std::map<Priority, TaskListByTarget> taskGroupByPrio_; 104 105 bool scheduledFlag_ = false; 106 std::string lastScheduleTarget_; 107 Priority lastSchedulePriority_ = Priority::LOW; 108 109 std::map<std::string, int> deviceCommErrCodeMap_; 110 }; 111 } 112 113 #endif