1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATORAGGREGATOR_H 17 #define COMMUNICATORAGGREGATOR_H 18 19 #include <atomic> 20 #include <condition_variable> 21 #include <cstdint> 22 #include <map> 23 #include <mutex> 24 #include <string> 25 #include <thread> 26 #include "frame_combiner.h" 27 #include "frame_retainer.h" 28 #include "iadapter.h" 29 #include "icommunicator.h" 30 #include "icommunicator_aggregator.h" 31 #include "parse_result.h" 32 #include "send_task_scheduler.h" 33 34 namespace DistributedDB { 35 // Forward Declarations 36 class Communicator; 37 class SerialBuffer; 38 class CommunicatorLinker; 39 40 struct TaskConfig { 41 bool nonBlock = true; 42 bool isRetryTask = true; 43 uint32_t timeout = 0u; 44 Priority prio = Priority::NORMAL; 45 AccessInfos infos; 46 }; 47 48 /* 49 * Upper layer Module should comply with calling convention, Inner Module interface will not do excessive check 50 */ 51 class CommunicatorAggregator : public ICommunicatorAggregator { 52 public: 53 CommunicatorAggregator(); 54 ~CommunicatorAggregator() override; 55 56 DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator); 57 58 // See ICommunicatorAggregator for detail 59 int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override; 60 61 // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called. 62 void Finalize() override; 63 64 ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId = "") override; 65 ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo, 66 const std::string &userId = "") override; 67 68 void ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId = "") override; 69 70 int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override; 71 int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override; 72 73 // return optimal allowed data size(Some header is taken into account and subtract) 74 uint32_t GetCommunicatorAggregatorMtuSize() const; 75 uint32_t GetCommunicatorAggregatorMtuSize(const std::string &target) const; 76 77 // return timeout in range [5s, 60s] 78 uint32_t GetCommunicatorAggregatorTimeout() const; 79 uint32_t GetCommunicatorAggregatorTimeout(const std::string &target) const; 80 bool IsDeviceOnline(const std::string &device) const; 81 int GetLocalIdentity(std::string &outTarget) const override; 82 83 // Get the protocol version of remote target. Return -E_NOT_FOUND if no record. 84 int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const; 85 86 // Called by communicator to make itself really in work 87 void ActivateCommunicator(const LabelType &commLabel, const std::string &userId = ""); 88 89 // SerialBuffer surely is heap memory, ScheduleSendTask responsible for lifecycle 90 int ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff, FrameType inType, 91 const TaskConfig &inConfig, const OnSendEnd &onEnd = nullptr); 92 93 static void EnableCommunicatorNotFoundFeedback(bool isEnable); 94 95 std::shared_ptr<ExtendHeaderHandle> GetExtendHeaderHandle(const ExtendInfo ¶mInfo); 96 97 void ClearOnlineLabel() override; 98 99 void ResetRetryCount(); 100 private: 101 // Working in a dedicated thread 102 void SendDataRoutine(); 103 void SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu, 104 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength); 105 106 int RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio); 107 void TaskFinalizer(const SendTask &inTask, int result); 108 void NotifySendableToAllCommunicator(); 109 110 // Call from Adapter by register these function 111 void OnBytesReceive(const ReceiveBytesInfo &receiveBytesInfo, const DataUserInfoProc &userInfoProc); 112 void OnTargetChange(const std::string &target, bool isConnect); 113 void OnSendable(const std::string &target); 114 115 void OnFragmentReceive(const ReceiveBytesInfo &receiveBytesInfo, const ParseResult &inResult, 116 const DataUserInfoProc &userInfoProc); 117 118 int OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult); 119 int OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo, const ParseResult &inResult, 120 const DataUserInfoProc &userInfoProc); 121 int OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo, SerialBuffer *&inFrameBuffer, 122 const ParseResult &inResult, const DataUserInfoProc &userInfoProc); 123 124 // Function with suffix NoMutex should be called with mutex in the caller 125 int TryDeliverAppLayerFrameToCommunicatorNoMutex(uint16_t remoteDbVersion, 126 const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo); 127 128 // Auxiliary function for cutting short primary function 129 int RegCallbackToAdapter(); 130 void UnRegCallbackFromAdapter(); 131 void GenerateLocalSourceId(); 132 bool ReGenerateLocalSourceIdIfNeed(); 133 134 // Feedback related functions 135 void TriggerVersionNegotiation(const std::string &dstTarget); 136 void TryToFeedBackWithErr(const std::string &dstTarget, const LabelType &dstLabel, 137 const SerialBuffer *inOriFrame, int inErrCode); 138 void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel, 139 const SerialBuffer *inOriFrame, int inErrCode); 140 void TriggerCommunicatorFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg, 141 int sendErrNo); 142 143 // Record the protocol version of remote target. 144 void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); 145 146 void OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos); 147 148 void NotifyConnectChange(const std::string &srcTarget, const std::map<LabelType, bool> &changedLabels); 149 150 void RegDBChangeCallback(); 151 152 void InitSendThread(); 153 154 void SendOnceData(); 155 156 void TriggerSendData(); 157 158 void ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu); 159 160 void RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId); 161 162 void RetrySendTask(const std::string &target, uint64_t sendSequenceId); 163 164 bool IsRetryOutOfLimit(const std::string &target); 165 166 int32_t GetNextRetryInterval(const std::string &target, int32_t currentRetryCount); 167 168 uint64_t GetSendSequenceId(const std::string &target); 169 170 uint64_t IncreaseSendSequenceId(const std::string &target); 171 172 int GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, const DataUserInfoProc &userInfoProc, 173 const std::string &device, UserInfo &userInfo); 174 175 int ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo, 176 SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc, 177 const UserInfo &userInfo); 178 179 DECLARE_OBJECT_TAG(CommunicatorAggregator); 180 181 static std::atomic<bool> isCommunicatorNotFoundFeedbackEnable_; 182 183 std::atomic<bool> shutdown_; 184 std::atomic<uint32_t> incFrameId_; 185 std::atomic<uint64_t> localSourceId_; 186 187 // Handle related 188 mutable std::mutex commMapMutex_; 189 // bool true indicate communicator activated 190 std::map<std::string, std::map<LabelType, std::pair<Communicator *, bool>>> commMap_; 191 FrameCombiner combiner_; 192 FrameRetainer retainer_; 193 SendTaskScheduler scheduler_; 194 IAdapter *adapterHandle_ = nullptr; 195 CommunicatorLinker *commLinker_ = nullptr; 196 197 // Thread related 198 std::thread exclusiveThread_; 199 bool wakingSignal_ = false; 200 mutable std::mutex wakingMutex_; 201 std::condition_variable wakingCv_; 202 203 // RetryCreateTask related 204 mutable std::mutex retryMutex_; 205 std::condition_variable retryCv_; 206 207 // Remote target version related 208 mutable std::mutex versionMapMutex_; 209 std::map<std::string, uint16_t> versionMap_; 210 211 // CommLack Callback related 212 CommunicatorLackCallback onCommLackHandle_; 213 Finalizer onCommLackFinalizer_; 214 mutable std::mutex onCommLackMutex_; 215 216 // Connect Callback related 217 OnConnectCallback onConnectHandle_; 218 Finalizer onConnectFinalizer_; 219 mutable std::mutex onConnectMutex_; 220 221 std::shared_ptr<DBStatusAdapter> dbStatusAdapter_; 222 223 std::atomic<bool> useExclusiveThread_ = false; 224 bool sendTaskStart_ = false; 225 mutable std::mutex scheduleSendTaskMutex_; 226 std::condition_variable finalizeCv_; 227 228 struct FrameSendRecord { 229 uint32_t splitMtu = 0u; 230 uint32_t sendIndex = 0u; 231 }; 232 std::mutex sendRecordMutex_; 233 std::map<uint32_t, FrameSendRecord> sendRecord_; 234 235 std::mutex retryCountMutex_; 236 std::map<std::string, int32_t> retryCount_; 237 238 std::mutex sendSequenceMutex_; 239 std::map<std::string, uint64_t> sendSequence_; 240 }; 241 } // namespace DistributedDB 242 243 #endif // COMMUNICATORAGGREGATOR_H 244