• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_ICOMMUNICATORAGGREGATOR_H
17 #define VIRTUAL_ICOMMUNICATORAGGREGATOR_H
18 
19 #include <cstdint>
20 #include <set>
21 
22 #include "icommunicator_aggregator.h"
23 #include "virtual_communicator.h"
24 
25 namespace DistributedDB {
26 class ICommunicator;  // Forward Declaration
27 
28 class VirtualCommunicatorAggregator : public ICommunicatorAggregator {
29 public:
30     // Return 0 as success. Return negative as error
31     int Initialize(IAdapter *inAdapter) override;
32 
33     void Finalize() override;
34 
35     // If not success, return nullptr and set outErrorNo
36     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override;
37     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override;
38 
39     void ReleaseCommunicator(ICommunicator *inCommunicator) override;
40 
41     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
42     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
43     void RunCommunicatorLackCallback(const LabelType &commLabel);
44     void RunOnConnectCallback(const std::string &target, bool isConnect);
45 
46     int GetLocalIdentity(std::string &outTarget) const override;
47 
48     // online a virtual device to the VirtualCommunicator, should call in main thread
49     void OnlineDevice(const std::string &deviceId) const;
50 
51     // offline a virtual device to the VirtualCommunicator, should call in main thread
52     void OfflineDevice(const std::string &deviceId) const;
53 
54     void DispatchMessage(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
55         const OnSendEnd &onEnd);
56 
57     // If not success, return nullptr and set outErrorNo
58     ICommunicator *AllocCommunicator(const std::string &deviceId, int &outErrorNo);
59 
60     ICommunicator *GetCommunicator(const std::string &deviceId) const;
61 
62     void Disable();
63 
64     void Enable();
65 
66     void SetBlockValue(bool value);
67 
68     bool GetBlockValue() const;
69 
70     void RegOnDispatch(const std::function<void(const std::string &target, Message *inMsg)> &onDispatch);
71 
72     void SetCurrentUserId(const std::string &userId);
73 
74     void SetTimeout(const std::string &deviceId, uint32_t timeout);
75 
76     void SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid, uint32_t dropTimes = 1);
77 
78     void SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize);
79 
80     void SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId, uint32_t delayTimes, uint32_t skipTimes,
81         std::set<std::string> &delayDevices);
82     void ResetSendDelayInfo();
83 
~VirtualCommunicatorAggregator()84     ~VirtualCommunicatorAggregator() {};
VirtualCommunicatorAggregator()85     VirtualCommunicatorAggregator() {};
86 
87 private:
88     void CallSendEnd(int errCode, const OnSendEnd &onEnd);
89     void DelayTimeHandle(uint32_t messageId, const std::string &dstTarget);
90 
91     mutable std::mutex communicatorsLock_;
92     std::map<std::string, VirtualCommunicator *> communicators_;
93     std::string remoteDeviceId_ = "real_device";
94     std::mutex blockLock_;
95     std::condition_variable conditionVar_;
96     bool isEnable_ = true;
97     bool isBlock_ = false;
98     CommunicatorLackCallback onCommLack_;
99     OnConnectCallback onConnect_;
100     std::function<void(const std::string &target, Message *inMsg)> onDispatch_;
101     std::string userId_;
102 
103     uint32_t sendDelayTime_ = 0;
104     uint32_t delayMessageId_ = INVALID_MESSAGE_ID;
105     uint32_t delayTimes_ = 0; // ms
106     uint32_t skipTimes_ = 0;
107     std::set<std::string> delayDevices_;
108 };
109 } // namespace DistributedDB
110 
111 #endif // VIRTUAL_ICOMMUNICATORAGGREGATOR_H