• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_ICOMMUNICATORAGGREGATOR_H
17 #define VIRTUAL_ICOMMUNICATORAGGREGATOR_H
18 
19 #include <cstdint>
20 #include <set>
21 
22 #include "icommunicator_aggregator.h"
23 #include "virtual_communicator.h"
24 
25 namespace DistributedDB {
26 class ICommunicator;  // Forward Declaration
27 using AllocCommunicatorCallback = std::function<void(const std::string &userId)>;
28 using ReleaseCommunicatorCallback = std::function<void(const std::string &userId)>;
29 class VirtualCommunicatorAggregator : public ICommunicatorAggregator {
30 public:
31     // Return 0 as success. Return negative as error
32     int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override;
33 
34     void Finalize() override;
35 
36     // If not success, return nullptr and set outErrorNo
37     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId = "") override;
38     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo,
39         const std::string &userId = "") override;
40 
41     void ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId = "") override;
42 
43     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
44     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
45     void RunCommunicatorLackCallback(const LabelType &commLabel);
46     void RunOnConnectCallback(const std::string &target, bool isConnect);
47 
48     int GetLocalIdentity(std::string &outTarget) const override;
49 
50     // online a virtual device to the VirtualCommunicator, should call in main thread
51     void OnlineDevice(const std::string &deviceId) const;
52 
53     // offline a virtual device to the VirtualCommunicator, should call in main thread
54     void OfflineDevice(const std::string &deviceId) const;
55 
56     void DispatchMessage(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
57         const OnSendEnd &onEnd);
58 
59     // If not success, return nullptr and set outErrorNo
60     ICommunicator *AllocCommunicator(const std::string &deviceId, int &outErrorNo);
61 
62     ICommunicator *GetCommunicator(const std::string &deviceId) const;
63 
64     void Disable();
65 
66     void Enable();
67 
68     void SetBlockValue(bool value);
69 
70     bool GetBlockValue() const;
71 
72     void RegOnDispatch(const std::function<void(const std::string &target, Message *inMsg)> &onDispatch);
73 
74     void SetCurrentUserId(const std::string &userId);
75 
76     void SetTimeout(const std::string &deviceId, uint32_t timeout);
77 
78     void SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid, uint32_t dropTimes = 1);
79 
80     void SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize);
81 
82     void SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId, uint32_t delayTimes, uint32_t skipTimes,
83         std::set<std::string> &delayDevices);
84     void ResetSendDelayInfo();
85 
86     std::set<std::string> GetOnlineDevices();
87 
88     void DisableCommunicator();
89 
90     void EnableCommunicator();
91 
92     void RegBeforeDispatch(const std::function<void(const std::string &, const Message *)> &beforeDispatch);
93 
94     void SetLocalDeviceId(const std::string &deviceId);
95 
96     void MockGetLocalDeviceRes(int mockRes);
97 
98     void SetAllocCommunicatorCallback(AllocCommunicatorCallback allocCommunicatorCallback);
99 
100     void SetReleaseCommunicatorCallback(ReleaseCommunicatorCallback releaseCommunicatorCallback);
101 
102     void MockCommErrCode(int mockErrCode);
103 
104     void MockDirectEndFlag(bool isDirectEnd);
105 
106     void ClearOnlineLabel() override;
107 
108     void SetRemoteDeviceId(const std::string &dev);
109 
110     uint64_t GetAllSendMsgSize() const;
111 
112     ~VirtualCommunicatorAggregator() override = default;
113     VirtualCommunicatorAggregator() = default;
114 
115 private:
116     void CallSendEnd(int errCode, const OnSendEnd &onEnd);
117     void DelayTimeHandle(uint32_t messageId, const std::string &dstTarget);
118     void DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
119         const OnSendEnd &onEnd);
120 
121     mutable std::mutex communicatorsLock_;
122     std::map<std::string, VirtualCommunicator *> communicators_;
123     std::string remoteDeviceId_ = "real_device";
124     std::mutex blockLock_;
125     std::condition_variable conditionVar_;
126     bool isEnable_ = true;
127     bool isBlock_ = false;
128     CommunicatorLackCallback onCommLack_;
129     OnConnectCallback onConnect_;
130     std::function<void(const std::string &target, Message *inMsg)> onDispatch_;
131     std::function<void(const std::string &target, const Message *inMsg)> beforeDispatch_;
132     std::string userId_;
133 
134     uint32_t sendDelayTime_ = 0;
135     uint32_t delayMessageId_ = INVALID_MESSAGE_ID;
136     uint32_t delayTimes_ = 0; // ms
137     uint32_t skipTimes_ = 0;
138     std::set<std::string> delayDevices_;
139 
140     mutable std::mutex localDeviceIdMutex_;
141     std::string localDeviceId_;
142     int getLocalDeviceRet_ = E_OK;
143     int commErrCodeMock_ = E_OK;
144     bool isDirectEnd_ = true;
145 
146     AllocCommunicatorCallback allocCommunicatorCallback_;
147     ReleaseCommunicatorCallback releaseCommunicatorCallback_;
148 };
149 } // namespace DistributedDB
150 
151 #endif // VIRTUAL_ICOMMUNICATORAGGREGATOR_H