1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATORAGGREGATOR_H 17 #define COMMUNICATORAGGREGATOR_H 18 19 #include <map> 20 #include <mutex> 21 #include <string> 22 #include <atomic> 23 #include <thread> 24 #include <cstdint> 25 #include <condition_variable> 26 #include "iadapter.h" 27 #include "parse_result.h" 28 #include "icommunicator.h" 29 #include "frame_combiner.h" 30 #include "frame_retainer.h" 31 #include "send_task_scheduler.h" 32 #include "icommunicator_aggregator.h" 33 34 namespace DistributedDB { 35 // Forward Declarations 36 class Communicator; 37 class SerialBuffer; 38 class CommunicatorLinker; 39 40 struct TaskConfig { 41 bool nonBlock; 42 uint32_t timeout; 43 Priority prio; 44 }; 45 46 /* 47 * Upper layer Module should comply with calling convention, Inner Module interface will not do excessive check 48 */ 49 class CommunicatorAggregator : public ICommunicatorAggregator { 50 public: 51 CommunicatorAggregator(); 52 ~CommunicatorAggregator() override; 53 54 DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator); 55 56 // See ICommunicatorAggregator for detail 57 int Initialize(IAdapter *inAdapter) override; 58 59 // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called. 60 void Finalize() override; 61 62 ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override; 63 ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override; 64 65 void ReleaseCommunicator(ICommunicator *inCommunicator) override; 66 67 int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override; 68 int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override; 69 70 // return optimal allowed data size(Some header is taken into account and subtract) 71 uint32_t GetCommunicatorAggregatorMtuSize() const; 72 uint32_t GetCommunicatorAggregatorMtuSize(const std::string &target) const; 73 74 // return timeout in range [5s, 60s] 75 uint32_t GetCommunicatorAggregatorTimeout() const; 76 uint32_t GetCommunicatorAggregatorTimeout(const std::string &target) const; 77 bool IsDeviceOnline(const std::string &device) const; 78 int GetLocalIdentity(std::string &outTarget) const override; 79 80 // Get the protocol version of remote target. Return -E_NOT_FOUND if no record. 81 int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const; 82 83 // Called by communicator to make itself really in work 84 void ActivateCommunicator(const LabelType &commLabel); 85 86 // SerialBuffer surely is heap memory, CreateSendTask responsible for lifecycle 87 int CreateSendTask(const std::string &dstTarget, SerialBuffer *inBuff, FrameType inType, 88 const TaskConfig &inConfig, const OnSendEnd &onEnd = nullptr); 89 90 static void EnableCommunicatorNotFoundFeedback(bool isEnable); 91 92 std::shared_ptr<ExtendHeaderHandle> GetExtendHeaderHandle(const ExtendInfo ¶mInfo); 93 94 private: 95 // Working in a dedicated thread 96 void SendDataRoutine(); 97 void SendPacketsAndDisposeTask(const SendTask &inTask, 98 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket); 99 100 int RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio); 101 void TaskFinalizer(const SendTask &inTask, int result); 102 void NotifySendableToAllCommunicator(); 103 104 // Call from Adapter by register these function 105 void OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length, 106 const std::string &userId); 107 void OnTargetChange(const std::string &target, bool isConnect); 108 void OnSendable(const std::string &target); 109 110 void OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length, 111 const ParseResult &inResult, const std::string &userId); 112 113 int OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult); 114 int OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes, 115 uint32_t length, const ParseResult &inResult, const std::string &userId); 116 int OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, 117 const ParseResult &inResult, const std::string &userId); 118 119 // Function with suffix NoMutex should be called with mutex in the caller 120 int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, 121 const LabelType &toLabel); 122 123 // Auxiliary function for cutting short primary function 124 int RegCallbackToAdapter(); 125 void UnRegCallbackFromAdapter(); 126 void GenerateLocalSourceId(); 127 bool ReGenerateLocalSourceIdIfNeed(); 128 129 // Feedback related functions 130 void TriggerVersionNegotiation(const std::string &dstTarget); 131 void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel, 132 const SerialBuffer *inOriFrame); 133 void TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg); 134 135 // Record the protocol version of remote target. 136 void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); 137 138 DECLARE_OBJECT_TAG(CommunicatorAggregator); 139 140 static std::atomic<bool> isCommunicatorNotFoundFeedbackEnable_; 141 142 std::atomic<bool> shutdown_; 143 std::atomic<uint32_t> incFrameId_; 144 std::atomic<uint64_t> localSourceId_; 145 146 // Handle related 147 mutable std::mutex commMapMutex_; 148 std::map<LabelType, std::pair<Communicator *, bool>> commMap_; // bool true indicate communicator activated 149 FrameCombiner combiner_; 150 FrameRetainer retainer_; 151 SendTaskScheduler scheduler_; 152 IAdapter *adapterHandle_ = nullptr; 153 CommunicatorLinker *commLinker_ = nullptr; 154 155 // Thread related 156 std::thread exclusiveThread_; 157 bool wakingSignal_ = false; 158 mutable std::mutex wakingMutex_; 159 std::condition_variable wakingCv_; 160 161 // RetryCreateTask related 162 mutable std::mutex retryMutex_; 163 std::condition_variable retryCv_; 164 165 // Remote target version related 166 mutable std::mutex versionMapMutex_; 167 std::map<std::string, uint16_t> versionMap_; 168 169 // CommLack Callback related 170 CommunicatorLackCallback onCommLackHandle_; 171 Finalizer onCommLackFinalizer_; 172 mutable std::mutex onCommLackMutex_; 173 174 // Connect Callback related 175 OnConnectCallback onConnectHandle_; 176 Finalizer onConnectFinalizer_; 177 mutable std::mutex onConnectMutex_; 178 }; 179 } // namespace DistributedDB 180 181 #endif // COMMUNICATORAGGREGATOR_H 182