• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_COMMUNICATOR_H
17 #define VIRTUAL_COMMUNICATOR_H
18 
19 #include <chrono>
20 #include <condition_variable>
21 #include <cstdint>
22 #include <functional>
23 #include <map>
24 #include <mutex>
25 #include <string>
26 
27 #include "icommunicator.h"
28 #include "ref_object.h"
29 #include "serial_buffer.h"
30 #include "sync_types.h"
31 
32 namespace DistributedDB {
33 class VirtualCommunicatorAggregator;
34 
35 class VirtualCommunicator : public ICommunicator {
36 public:
37     VirtualCommunicator(const std::string &deviceId, VirtualCommunicatorAggregator *communicatorAggregator);
38     ~VirtualCommunicator() override;
39 
40     DISABLE_COPY_ASSIGN_MOVE(VirtualCommunicator);
41 
42     int RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper) override;
43     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
44     int RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper) override;
45 
46     void Activate(const std::string &userId = "") override;
47 
48     uint32_t GetCommunicatorMtuSize() const override;
49     uint32_t GetCommunicatorMtuSize(const std::string &target) const override;
50     void SetCommunicatorMtuSize(uint32_t mtuSize);
51 
52     uint32_t GetTimeout() const override;
53     uint32_t GetTimeout(const std::string &target) const override;
54     void SetTimeout(uint32_t timeout);
55 
56     int GetLocalIdentity(std::string &outTarget) const override;
57 
58     int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) override;
59     int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
60         const OnSendEnd &onEnd) override;
61 
62     int GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const override;
63 
64     void CallbackOnMessage(const std::string &srcTarget, Message *inMsg);
65 
66     void CallbackOnConnect(const std::string &target, bool isConnect) const;
67 
68     int GeneralVirtualSyncId();
69 
70     void Disable();
71 
72     void Enable();
73 
74     void SetDeviceId(const std::string &deviceId);
75 
76     std::string GetDeviceId() const;
77 
78     bool IsEnabled() const;
79 
80     bool IsDeviceOnline(const std::string &device) const override;
81 
82     void SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes = 1);
83 
84     void SetRemoteVersion(uint16_t remoteVersion);
85 
86     std::string GetTargetUserId(const ExtendInfo &paramInfo) const override;
87 
88     void SetTargetUserId(const std::string &userId);
89 
90     uint64_t GetSendMsgSize() const;
91 
92     bool ExchangeClosePending(bool expected) override;
93 private:
94     int TranslateMsg(const Message *inMsg, Message *&outMsg);
95 
96     mutable std::mutex onMessageLock_;
97     OnMessageCallback onMessage_;
98 
99     mutable std::mutex onConnectLock_;
100     OnConnectCallback onConnect_;
101     mutable std::mutex devicesMapLock_;
102     mutable std::map<std::string, bool> onlineDevicesMap_;
103 
104     std::string remoteDeviceId_ = "real_device";
105     std::mutex syncIdLock_;
106     int currentSyncId_ = 1000;
107     bool isEnable_ = true;
108     std::string deviceId_;
109 
110     std::mutex onAggregatorLock_;
111     VirtualCommunicatorAggregator *communicatorAggregator_;
112 
113     uint32_t timeout_ = 5 * 1000; // 5 * 1000ms
114     MessageId dropMsgId_ = MessageId::UNKNOW_MESSAGE;
115     uint32_t dropMsgTimes_ = 0;
116     uint32_t mtuSize_ = 5 * 1024 * 1024; // 5 * 1024 * 1024B
117 
118     uint16_t remoteVersion_ = UINT16_MAX;
119 
120     std::string targetUserId_;
121     std::atomic<uint64_t> sendMsgSize_ = 0;
122     std::atomic<bool> dbClosePending_;
123 };
124 } // namespace DistributedDB
125 
126 #endif // VIRTUAL_COMMUNICATOR_H
127