• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_ICOMMUNICATORAGGREGATOR_H
17 #define VIRTUAL_ICOMMUNICATORAGGREGATOR_H
18 
19 #include <cstdint>
20 #include <set>
21 
22 #include "icommunicator_aggregator.h"
23 #include "virtual_communicator.h"
24 
25 namespace DistributedDB {
26 class ICommunicator;  // Forward Declaration
27 
28 class VirtualCommunicatorAggregator : public ICommunicatorAggregator {
29 public:
30     // Return 0 as success. Return negative as error
31     int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override;
32 
33     void Finalize() override;
34 
35     // If not success, return nullptr and set outErrorNo
36     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override;
37     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override;
38 
39     void ReleaseCommunicator(ICommunicator *inCommunicator) override;
40 
41     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
42     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
43     void RunCommunicatorLackCallback(const LabelType &commLabel);
44     void RunOnConnectCallback(const std::string &target, bool isConnect);
45 
46     int GetLocalIdentity(std::string &outTarget) const override;
47 
48     // online a virtual device to the VirtualCommunicator, should call in main thread
49     void OnlineDevice(const std::string &deviceId) const;
50 
51     // offline a virtual device to the VirtualCommunicator, should call in main thread
52     void OfflineDevice(const std::string &deviceId) const;
53 
54     void DispatchMessage(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
55         const OnSendEnd &onEnd);
56 
57     // If not success, return nullptr and set outErrorNo
58     ICommunicator *AllocCommunicator(const std::string &deviceId, int &outErrorNo);
59 
60     ICommunicator *GetCommunicator(const std::string &deviceId) const;
61 
62     void Disable();
63 
64     void Enable();
65 
66     void SetBlockValue(bool value);
67 
68     bool GetBlockValue() const;
69 
70     void RegOnDispatch(const std::function<void(const std::string &target, Message *inMsg)> &onDispatch);
71 
72     void SetCurrentUserId(const std::string &userId);
73 
74     void SetTimeout(const std::string &deviceId, uint32_t timeout);
75 
76     void SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid, uint32_t dropTimes = 1);
77 
78     void SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize);
79 
80     void SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId, uint32_t delayTimes, uint32_t skipTimes,
81         std::set<std::string> &delayDevices);
82     void ResetSendDelayInfo();
83 
84     std::set<std::string> GetOnlineDevices();
85 
86     void DisableCommunicator();
87 
88     void EnableCommunicator();
89 
90     void RegBeforeDispatch(const std::function<void(const std::string &, const Message *)> &beforeDispatch);
91 
92     void SetLocalDeviceId(const std::string &deviceId);
93 
94     void MockGetLocalDeviceRes(int mockRes);
95 
96     ~VirtualCommunicatorAggregator() override = default;
97     VirtualCommunicatorAggregator() = default;
98 
99 private:
100     void CallSendEnd(int errCode, const OnSendEnd &onEnd);
101     void DelayTimeHandle(uint32_t messageId, const std::string &dstTarget);
102     void DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
103         const OnSendEnd &onEnd);
104 
105     mutable std::mutex communicatorsLock_;
106     std::map<std::string, VirtualCommunicator *> communicators_;
107     std::string remoteDeviceId_ = "real_device";
108     std::mutex blockLock_;
109     std::condition_variable conditionVar_;
110     bool isEnable_ = true;
111     bool isBlock_ = false;
112     CommunicatorLackCallback onCommLack_;
113     OnConnectCallback onConnect_;
114     std::function<void(const std::string &target, Message *inMsg)> onDispatch_;
115     std::function<void(const std::string &target, const Message *inMsg)> beforeDispatch_;
116     std::string userId_;
117 
118     uint32_t sendDelayTime_ = 0;
119     uint32_t delayMessageId_ = INVALID_MESSAGE_ID;
120     uint32_t delayTimes_ = 0; // ms
121     uint32_t skipTimes_ = 0;
122     std::set<std::string> delayDevices_;
123 
124     mutable std::mutex localDeviceIdMutex_;
125     std::string localDeviceId_;
126     int getLocalDeviceRet_ = E_OK;
127 };
128 } // namespace DistributedDB
129 
130 #endif // VIRTUAL_ICOMMUNICATORAGGREGATOR_H