• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef COMMUNICATORAGGREGATOR_H
17 #define COMMUNICATORAGGREGATOR_H
18 
19 #include <map>
20 #include <mutex>
21 #include <string>
22 #include <atomic>
23 #include <thread>
24 #include <cstdint>
25 #include <condition_variable>
26 #include "iadapter.h"
27 #include "parse_result.h"
28 #include "icommunicator.h"
29 #include "frame_combiner.h"
30 #include "frame_retainer.h"
31 #include "send_task_scheduler.h"
32 #include "icommunicator_aggregator.h"
33 
34 namespace DistributedDB {
35 // Forward Declarations
36 class Communicator;
37 class SerialBuffer;
38 class CommunicatorLinker;
39 
40 struct TaskConfig {
41     bool nonBlock;
42     uint32_t timeout;
43     Priority prio;
44 };
45 
46 /*
47  * Upper layer Module should comply with calling convention, Inner Module interface will not do excessive check
48  */
49 class CommunicatorAggregator : public ICommunicatorAggregator {
50 public:
51     CommunicatorAggregator();
52     ~CommunicatorAggregator() override;
53 
54     DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator);
55 
56     // See ICommunicatorAggregator for detail
57     int Initialize(IAdapter *inAdapter) override;
58 
59     // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called.
60     void Finalize() override;
61 
62     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override;
63     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override;
64 
65     void ReleaseCommunicator(ICommunicator *inCommunicator) override;
66 
67     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
68     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
69 
70     // return optimal allowed data size(Some header is taken into account and subtract)
71     uint32_t GetCommunicatorAggregatorMtuSize() const;
72     uint32_t GetCommunicatorAggregatorMtuSize(const std::string &target) const;
73 
74     // return timeout in range [5s, 60s]
75     uint32_t GetCommunicatorAggregatorTimeout() const;
76     uint32_t GetCommunicatorAggregatorTimeout(const std::string &target) const;
77     bool IsDeviceOnline(const std::string &device) const;
78     int GetLocalIdentity(std::string &outTarget) const override;
79 
80     // Get the protocol version of remote target. Return -E_NOT_FOUND if no record.
81     int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const;
82 
83     // Called by communicator to make itself really in work
84     void ActivateCommunicator(const LabelType &commLabel);
85 
86     // SerialBuffer surely is heap memory, CreateSendTask responsible for lifecycle
87     int CreateSendTask(const std::string &dstTarget, SerialBuffer *inBuff, FrameType inType,
88         const TaskConfig &inConfig, const OnSendEnd &onEnd = nullptr);
89 
90     static void EnableCommunicatorNotFoundFeedback(bool isEnable);
91 
92     std::shared_ptr<ExtendHeaderHandle> GetExtendHeaderHandle(const ExtendInfo &paramInfo);
93 
94 private:
95     // Working in a dedicated thread
96     void SendDataRoutine();
97     void SendPacketsAndDisposeTask(const SendTask &inTask,
98         const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket);
99 
100     int RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio);
101     void TaskFinalizer(const SendTask &inTask, int result);
102     void NotifySendableToAllCommunicator();
103 
104     // Call from Adapter by register these function
105     void OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
106         const std::string &userId);
107     void OnTargetChange(const std::string &target, bool isConnect);
108     void OnSendable(const std::string &target);
109 
110     void OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
111         const ParseResult &inResult, const std::string &userId);
112 
113     int OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult);
114     int OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
115         uint32_t length, const ParseResult &inResult, const std::string &userId);
116     int OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
117         const ParseResult &inResult, const std::string &userId);
118 
119     // Function with suffix NoMutex should be called with mutex in the caller
120     int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
121         const LabelType &toLabel);
122 
123     // Auxiliary function for cutting short primary function
124     int RegCallbackToAdapter();
125     void UnRegCallbackFromAdapter();
126     void GenerateLocalSourceId();
127     bool ReGenerateLocalSourceIdIfNeed();
128 
129     // Feedback related functions
130     void TriggerVersionNegotiation(const std::string &dstTarget);
131     void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel,
132         const SerialBuffer *inOriFrame);
133     void TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg);
134 
135     // Record the protocol version of remote target.
136     void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version);
137 
138     DECLARE_OBJECT_TAG(CommunicatorAggregator);
139 
140     static std::atomic<bool> isCommunicatorNotFoundFeedbackEnable_;
141 
142     std::atomic<bool> shutdown_;
143     std::atomic<uint32_t> incFrameId_;
144     std::atomic<uint64_t> localSourceId_;
145 
146     // Handle related
147     mutable std::mutex commMapMutex_;
148     std::map<LabelType, std::pair<Communicator *, bool>> commMap_; // bool true indicate communicator activated
149     FrameCombiner combiner_;
150     FrameRetainer retainer_;
151     SendTaskScheduler scheduler_;
152     IAdapter *adapterHandle_ = nullptr;
153     CommunicatorLinker *commLinker_ = nullptr;
154 
155     // Thread related
156     std::thread exclusiveThread_;
157     bool wakingSignal_ = false;
158     mutable std::mutex wakingMutex_;
159     std::condition_variable wakingCv_;
160 
161     // RetryCreateTask related
162     mutable std::mutex retryMutex_;
163     std::condition_variable retryCv_;
164 
165     // Remote target version related
166     mutable std::mutex versionMapMutex_;
167     std::map<std::string, uint16_t> versionMap_;
168 
169     // CommLack Callback related
170     CommunicatorLackCallback onCommLackHandle_;
171     Finalizer onCommLackFinalizer_;
172     mutable std::mutex onCommLackMutex_;
173 
174     // Connect Callback related
175     OnConnectCallback onConnectHandle_;
176     Finalizer onConnectFinalizer_;
177     mutable std::mutex onConnectMutex_;
178 };
179 } // namespace DistributedDB
180 
181 #endif // COMMUNICATORAGGREGATOR_H
182