1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATORAGGREGATOR_H 17 #define COMMUNICATORAGGREGATOR_H 18 19 #include <atomic> 20 #include <condition_variable> 21 #include <cstdint> 22 #include <map> 23 #include <mutex> 24 #include <string> 25 #include <thread> 26 #include "frame_combiner.h" 27 #include "frame_retainer.h" 28 #include "iadapter.h" 29 #include "icommunicator.h" 30 #include "icommunicator_aggregator.h" 31 #include "parse_result.h" 32 #include "send_task_scheduler.h" 33 34 namespace DistributedDB { 35 // Forward Declarations 36 class Communicator; 37 class SerialBuffer; 38 class CommunicatorLinker; 39 40 struct TaskConfig { 41 bool nonBlock = true; 42 uint32_t timeout = 0u; 43 Priority prio = Priority::NORMAL; 44 }; 45 46 /* 47 * Upper layer Module should comply with calling convention, Inner Module interface will not do excessive check 48 */ 49 class CommunicatorAggregator : public ICommunicatorAggregator { 50 public: 51 CommunicatorAggregator(); 52 ~CommunicatorAggregator() override; 53 54 DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator); 55 56 // See ICommunicatorAggregator for detail 57 int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override; 58 59 // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called. 60 void Finalize() override; 61 62 ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId = "") override; 63 ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo, 64 const std::string &userId = "") override; 65 66 void ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId = "") override; 67 68 int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override; 69 int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override; 70 71 // return optimal allowed data size(Some header is taken into account and subtract) 72 uint32_t GetCommunicatorAggregatorMtuSize() const; 73 uint32_t GetCommunicatorAggregatorMtuSize(const std::string &target) const; 74 75 // return timeout in range [5s, 60s] 76 uint32_t GetCommunicatorAggregatorTimeout() const; 77 uint32_t GetCommunicatorAggregatorTimeout(const std::string &target) const; 78 bool IsDeviceOnline(const std::string &device) const; 79 int GetLocalIdentity(std::string &outTarget) const override; 80 81 // Get the protocol version of remote target. Return -E_NOT_FOUND if no record. 82 int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const; 83 84 // Called by communicator to make itself really in work 85 void ActivateCommunicator(const LabelType &commLabel, const std::string &userId = ""); 86 87 // SerialBuffer surely is heap memory, ScheduleSendTask responsible for lifecycle 88 int ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff, FrameType inType, 89 const TaskConfig &inConfig, const OnSendEnd &onEnd = nullptr); 90 91 static void EnableCommunicatorNotFoundFeedback(bool isEnable); 92 93 std::shared_ptr<ExtendHeaderHandle> GetExtendHeaderHandle(const ExtendInfo ¶mInfo); 94 95 private: 96 // Working in a dedicated thread 97 void SendDataRoutine(); 98 void SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu, 99 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength); 100 101 int RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio); 102 void TaskFinalizer(const SendTask &inTask, int result); 103 void NotifySendableToAllCommunicator(); 104 105 // Call from Adapter by register these function 106 void OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length, 107 const DataUserInfoProc &userInfoProc); 108 void OnTargetChange(const std::string &target, bool isConnect); 109 void OnSendable(const std::string &target); 110 111 void OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length, 112 const ParseResult &inResult, const DataUserInfoProc &userInfoProc); 113 114 int OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult); 115 int OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes, 116 uint32_t length, const ParseResult &inResult, const DataUserInfoProc &userInfoProc); 117 int OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, 118 const ParseResult &inResult, const DataUserInfoProc &userInfoProc); 119 120 // Function with suffix NoMutex should be called with mutex in the caller 121 int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, 122 const LabelType &toLabel, const std::string &userId = ""); 123 124 // Auxiliary function for cutting short primary function 125 int RegCallbackToAdapter(); 126 void UnRegCallbackFromAdapter(); 127 void GenerateLocalSourceId(); 128 bool ReGenerateLocalSourceIdIfNeed(); 129 130 // Feedback related functions 131 void TriggerVersionNegotiation(const std::string &dstTarget); 132 void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel, 133 const SerialBuffer *inOriFrame); 134 void TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg); 135 136 // Record the protocol version of remote target. 137 void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); 138 139 void OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos); 140 141 void NotifyConnectChange(const std::string &srcTarget, const std::map<LabelType, bool> &changedLabels); 142 143 void RegDBChangeCallback(); 144 145 void InitSendThread(); 146 147 void SendOnceData(); 148 149 void TriggerSendData(); 150 151 void ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu); 152 153 void RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId); 154 155 void RetrySendTask(const std::string &target, uint64_t sendSequenceId); 156 157 bool IsRetryOutOfLimit(const std::string &target); 158 159 int32_t GetNextRetryInterval(const std::string &target, int32_t currentRetryCount); 160 161 uint64_t GetSendSequenceId(const std::string &target); 162 163 uint64_t IncreaseSendSequenceId(const std::string &target); 164 165 int GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, const DataUserInfoProc &userInfoProc, 166 std::string &userId); 167 168 DECLARE_OBJECT_TAG(CommunicatorAggregator); 169 170 static std::atomic<bool> isCommunicatorNotFoundFeedbackEnable_; 171 172 std::atomic<bool> shutdown_; 173 std::atomic<uint32_t> incFrameId_; 174 std::atomic<uint64_t> localSourceId_; 175 176 // Handle related 177 mutable std::mutex commMapMutex_; 178 // bool true indicate communicator activated 179 std::map<std::string, std::map<LabelType, std::pair<Communicator *, bool>>> commMap_; 180 FrameCombiner combiner_; 181 FrameRetainer retainer_; 182 SendTaskScheduler scheduler_; 183 IAdapter *adapterHandle_ = nullptr; 184 CommunicatorLinker *commLinker_ = nullptr; 185 186 // Thread related 187 std::thread exclusiveThread_; 188 bool wakingSignal_ = false; 189 mutable std::mutex wakingMutex_; 190 std::condition_variable wakingCv_; 191 192 // RetryCreateTask related 193 mutable std::mutex retryMutex_; 194 std::condition_variable retryCv_; 195 196 // Remote target version related 197 mutable std::mutex versionMapMutex_; 198 std::map<std::string, uint16_t> versionMap_; 199 200 // CommLack Callback related 201 CommunicatorLackCallback onCommLackHandle_; 202 Finalizer onCommLackFinalizer_; 203 mutable std::mutex onCommLackMutex_; 204 205 // Connect Callback related 206 OnConnectCallback onConnectHandle_; 207 Finalizer onConnectFinalizer_; 208 mutable std::mutex onConnectMutex_; 209 210 std::shared_ptr<DBStatusAdapter> dbStatusAdapter_; 211 212 std::atomic<bool> useExclusiveThread_ = false; 213 bool sendTaskStart_ = false; 214 mutable std::mutex scheduleSendTaskMutex_; 215 std::condition_variable finalizeCv_; 216 217 struct FrameSendRecord { 218 uint32_t splitMtu = 0u; 219 uint32_t sendIndex = 0u; 220 }; 221 std::mutex sendRecordMutex_; 222 std::map<uint32_t, FrameSendRecord> sendRecord_; 223 224 std::mutex retryCountMutex_; 225 std::map<std::string, int32_t> retryCount_; 226 227 std::mutex sendSequenceMutex_; 228 std::map<std::string, uint64_t> sendSequence_; 229 }; 230 } // namespace DistributedDB 231 232 #endif // COMMUNICATORAGGREGATOR_H 233