• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef COMMUNICATORAGGREGATOR_H
17 #define COMMUNICATORAGGREGATOR_H
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <cstdint>
22 #include <map>
23 #include <mutex>
24 #include <string>
25 #include <thread>
26 #include "frame_combiner.h"
27 #include "frame_retainer.h"
28 #include "iadapter.h"
29 #include "icommunicator.h"
30 #include "icommunicator_aggregator.h"
31 #include "parse_result.h"
32 #include "send_task_scheduler.h"
33 
34 namespace DistributedDB {
35 // Forward Declarations
36 class Communicator;
37 class SerialBuffer;
38 class CommunicatorLinker;
39 
40 struct TaskConfig {
41     bool nonBlock = true;
42     bool isRetryTask = true;
43     uint32_t timeout = 0u;
44     Priority prio = Priority::NORMAL;
45     AccessInfos infos;
46 };
47 
48 /*
49  * Upper layer Module should comply with calling convention, Inner Module interface will not do excessive check
50  */
51 class CommunicatorAggregator : public ICommunicatorAggregator {
52 public:
53     CommunicatorAggregator();
54     ~CommunicatorAggregator() override;
55 
56     DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator);
57 
58     // See ICommunicatorAggregator for detail
59     int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override;
60 
61     // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called.
62     void Finalize() override;
63 
64     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId = "") override;
65     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo,
66         const std::string &userId = "") override;
67 
68     void ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId = "") override;
69 
70     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
71     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
72 
73     // return optimal allowed data size(Some header is taken into account and subtract)
74     uint32_t GetCommunicatorAggregatorMtuSize() const;
75     uint32_t GetCommunicatorAggregatorMtuSize(const std::string &target) const;
76 
77     // return timeout in range [5s, 60s]
78     uint32_t GetCommunicatorAggregatorTimeout() const;
79     uint32_t GetCommunicatorAggregatorTimeout(const std::string &target) const;
80     bool IsDeviceOnline(const std::string &device) const;
81     int GetLocalIdentity(std::string &outTarget) const override;
82 
83     // Get the protocol version of remote target. Return -E_NOT_FOUND if no record.
84     int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const;
85 
86     // Called by communicator to make itself really in work
87     void ActivateCommunicator(const LabelType &commLabel, const std::string &userId = "");
88 
89     // SerialBuffer surely is heap memory, ScheduleSendTask responsible for lifecycle
90     int ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff, FrameType inType,
91         const TaskConfig &inConfig, const OnSendEnd &onEnd = nullptr);
92 
93     static void EnableCommunicatorNotFoundFeedback(bool isEnable);
94 
95     std::shared_ptr<ExtendHeaderHandle> GetExtendHeaderHandle(const ExtendInfo &paramInfo);
96 
97     void ClearOnlineLabel() override;
98 
99     void ResetRetryCount();
100 private:
101     // Working in a dedicated thread
102     void SendDataRoutine();
103     void SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
104         const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength);
105 
106     int RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio);
107     void TaskFinalizer(const SendTask &inTask, int result);
108     void NotifySendableToAllCommunicator();
109 
110     // Call from Adapter by register these function
111     void OnBytesReceive(const ReceiveBytesInfo &receiveBytesInfo, const DataUserInfoProc &userInfoProc);
112     void OnTargetChange(const std::string &target, bool isConnect);
113     void OnSendable(const std::string &target);
114 
115     void OnFragmentReceive(const ReceiveBytesInfo &receiveBytesInfo, const ParseResult &inResult,
116         const DataUserInfoProc &userInfoProc);
117 
118     int OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult);
119     int OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo, const ParseResult &inResult,
120         const DataUserInfoProc &userInfoProc);
121     int OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo, SerialBuffer *&inFrameBuffer,
122         const ParseResult &inResult, const DataUserInfoProc &userInfoProc);
123 
124     // Function with suffix NoMutex should be called with mutex in the caller
125     int TryDeliverAppLayerFrameToCommunicatorNoMutex(uint16_t remoteDbVersion,
126         const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo);
127 
128     // Auxiliary function for cutting short primary function
129     int RegCallbackToAdapter();
130     void UnRegCallbackFromAdapter();
131     void GenerateLocalSourceId();
132     bool ReGenerateLocalSourceIdIfNeed();
133 
134     // Feedback related functions
135     void TriggerVersionNegotiation(const std::string &dstTarget);
136     void TryToFeedBackWithErr(const std::string &dstTarget, const LabelType &dstLabel,
137         const SerialBuffer *inOriFrame, int inErrCode);
138     void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel,
139         const SerialBuffer *inOriFrame, int inErrCode);
140     void TriggerCommunicatorFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg,
141         int sendErrNo);
142 
143     // Record the protocol version of remote target.
144     void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version);
145 
146     void OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos);
147 
148     void NotifyConnectChange(const std::string &srcTarget, const std::map<LabelType, bool> &changedLabels);
149 
150     void RegDBChangeCallback();
151 
152     void InitSendThread();
153 
154     void SendOnceData();
155 
156     void TriggerSendData();
157 
158     void ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu);
159 
160     void RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId);
161 
162     void RetrySendTask(const std::string &target, uint64_t sendSequenceId);
163 
164     bool IsRetryOutOfLimit(const std::string &target);
165 
166     int32_t GetNextRetryInterval(const std::string &target, int32_t currentRetryCount);
167 
168     uint64_t GetSendSequenceId(const std::string &target);
169 
170     uint64_t IncreaseSendSequenceId(const std::string &target);
171 
172     int GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, const DataUserInfoProc &userInfoProc,
173         const std::string &device, UserInfo &userInfo);
174 
175     int ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo,
176         SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc,
177         const UserInfo &userInfo);
178 
179     DECLARE_OBJECT_TAG(CommunicatorAggregator);
180 
181     static std::atomic<bool> isCommunicatorNotFoundFeedbackEnable_;
182 
183     std::atomic<bool> shutdown_;
184     std::atomic<uint32_t> incFrameId_;
185     std::atomic<uint64_t> localSourceId_;
186 
187     // Handle related
188     mutable std::mutex commMapMutex_;
189     // bool true indicate communicator activated
190     std::map<std::string, std::map<LabelType, std::pair<Communicator *, bool>>> commMap_;
191     FrameCombiner combiner_;
192     FrameRetainer retainer_;
193     SendTaskScheduler scheduler_;
194     IAdapter *adapterHandle_ = nullptr;
195     CommunicatorLinker *commLinker_ = nullptr;
196 
197     // Thread related
198     std::thread exclusiveThread_;
199     bool wakingSignal_ = false;
200     mutable std::mutex wakingMutex_;
201     std::condition_variable wakingCv_;
202 
203     // RetryCreateTask related
204     mutable std::mutex retryMutex_;
205     std::condition_variable retryCv_;
206 
207     // Remote target version related
208     mutable std::mutex versionMapMutex_;
209     std::map<std::string, uint16_t> versionMap_;
210 
211     // CommLack Callback related
212     CommunicatorLackCallback onCommLackHandle_;
213     Finalizer onCommLackFinalizer_;
214     mutable std::mutex onCommLackMutex_;
215 
216     // Connect Callback related
217     OnConnectCallback onConnectHandle_;
218     Finalizer onConnectFinalizer_;
219     mutable std::mutex onConnectMutex_;
220 
221     std::shared_ptr<DBStatusAdapter> dbStatusAdapter_;
222 
223     std::atomic<bool> useExclusiveThread_ = false;
224     bool sendTaskStart_ = false;
225     mutable std::mutex scheduleSendTaskMutex_;
226     std::condition_variable finalizeCv_;
227 
228     struct FrameSendRecord {
229         uint32_t splitMtu = 0u;
230         uint32_t sendIndex = 0u;
231     };
232     std::mutex sendRecordMutex_;
233     std::map<uint32_t, FrameSendRecord> sendRecord_;
234 
235     std::mutex retryCountMutex_;
236     std::map<std::string, int32_t> retryCount_;
237 
238     std::mutex sendSequenceMutex_;
239     std::map<std::string, uint64_t> sendSequence_;
240 };
241 } // namespace DistributedDB
242 
243 #endif // COMMUNICATORAGGREGATOR_H
244