• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_ICOMMUNICATORAGGREGATOR_H
17 #define VIRTUAL_ICOMMUNICATORAGGREGATOR_H
18 
19 #include <cstdint>
20 #include <set>
21 
22 #include "icommunicator_aggregator.h"
23 #include "virtual_communicator.h"
24 
25 namespace DistributedDB {
26 class ICommunicator;  // Forward Declaration
27 
28 class VirtualCommunicatorAggregator : public ICommunicatorAggregator {
29 public:
30     // Return 0 as success. Return negative as error
31     int Initialize(IAdapter *inAdapter) override;
32 
33     void Finalize() override;
34 
35     // If not success, return nullptr and set outErrorNo
36     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override;
37     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override;
38 
39     void ReleaseCommunicator(ICommunicator *inCommunicator) override;
40 
41     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
42     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
43     void RunCommunicatorLackCallback(const LabelType &commLabel);
44     void RunOnConnectCallback(const std::string &target, bool isConnect);
45 
46     int GetLocalIdentity(std::string &outTarget) const override;
47 
48     // online a virtual device to the VirtualCommunicator, should call in main thread
49     void OnlineDevice(const std::string &deviceId) const;
50 
51     // offline a virtual device to the VirtualCommunicator, should call in main thread
52     void OfflineDevice(const std::string &deviceId) const;
53 
54     void DispatchMessage(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
55         const OnSendEnd &onEnd);
56 
57     // If not success, return nullptr and set outErrorNo
58     ICommunicator *AllocCommunicator(const std::string &deviceId, int &outErrorNo);
59 
60     ICommunicator *GetCommunicator(const std::string &deviceId) const;
61 
62     void Disable();
63 
64     void Enable();
65 
66     void SetBlockValue(bool value);
67 
68     bool GetBlockValue() const;
69 
70     void RegOnDispatch(const std::function<void(const std::string &target, Message *inMsg)> &onDispatch);
71 
72     void SetCurrentUserId(const std::string &userId);
73 
74     void SetTimeout(const std::string &deviceId, uint32_t timeout);
75 
76     void SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid, uint32_t dropTimes = 1);
77 
78     void SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize);
79 
80     void SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId, uint32_t delayTimes, uint32_t skipTimes,
81         std::set<std::string> &delayDevices);
82     void ResetSendDelayInfo();
83 
84     std::set<std::string> GetOnlineDevices();
85 
86     void DisableCommunicator();
87 
88     void EnableCommunicator();
89 
~VirtualCommunicatorAggregator()90     ~VirtualCommunicatorAggregator() {};
VirtualCommunicatorAggregator()91     VirtualCommunicatorAggregator() {};
92 
93 private:
94     void CallSendEnd(int errCode, const OnSendEnd &onEnd);
95     void DelayTimeHandle(uint32_t messageId, const std::string &dstTarget);
96 
97     mutable std::mutex communicatorsLock_;
98     std::map<std::string, VirtualCommunicator *> communicators_;
99     std::string remoteDeviceId_ = "real_device";
100     std::mutex blockLock_;
101     std::condition_variable conditionVar_;
102     bool isEnable_ = true;
103     bool isBlock_ = false;
104     CommunicatorLackCallback onCommLack_;
105     OnConnectCallback onConnect_;
106     std::function<void(const std::string &target, Message *inMsg)> onDispatch_;
107     std::string userId_;
108 
109     uint32_t sendDelayTime_ = 0;
110     uint32_t delayMessageId_ = INVALID_MESSAGE_ID;
111     uint32_t delayTimes_ = 0; // ms
112     uint32_t skipTimes_ = 0;
113     std::set<std::string> delayDevices_;
114 };
115 } // namespace DistributedDB
116 
117 #endif // VIRTUAL_ICOMMUNICATORAGGREGATOR_H