1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "process_communicator_impl.h"
17
18 #include <logger.h>
19
20 namespace OHOS {
21 namespace ObjectStore {
22 using namespace DistributedDB;
ProcessCommunicatorImpl()23 ProcessCommunicatorImpl::ProcessCommunicatorImpl()
24 {
25 }
26
~ProcessCommunicatorImpl()27 ProcessCommunicatorImpl::~ProcessCommunicatorImpl()
28 {
29 LOG_ERROR("destructor.");
30 }
31
Start(const std::string & processLabel)32 DBStatus ProcessCommunicatorImpl::Start(const std::string &processLabel)
33 {
34 LOG_INFO("init commProvider");
35 thisProcessLabel_ = processLabel;
36 PipeInfo pi = { thisProcessLabel_ };
37 Status errCode = CommunicationProvider::GetInstance().Start(pi);
38 if (errCode != Status::SUCCESS) {
39 LOG_ERROR("commProvider_ Start Fail.");
40 return DBStatus::DB_ERROR;
41 }
42 return DBStatus::OK;
43 }
44
Stop()45 DBStatus ProcessCommunicatorImpl::Stop()
46 {
47 PipeInfo pi = { thisProcessLabel_ };
48 Status errCode = CommunicationProvider::GetInstance().Stop(pi);
49 if (errCode != Status::SUCCESS) {
50 LOG_ERROR("commProvider_ Stop Fail.");
51 return DBStatus::DB_ERROR;
52 }
53 return DBStatus::OK;
54 }
55
RegOnDeviceChange(const OnDeviceChange & callback)56 DBStatus ProcessCommunicatorImpl::RegOnDeviceChange(const OnDeviceChange &callback)
57 {
58 {
59 std::lock_guard<std::mutex> onDeviceChangeLockGard(onDeviceChangeMutex_);
60 onDeviceChangeHandler_ = callback;
61 }
62
63 PipeInfo pi = { thisProcessLabel_ };
64 if (callback) {
65 Status errCode = CommunicationProvider::GetInstance().StartWatchDeviceChange(this, pi);
66 if (errCode != Status::SUCCESS) {
67 LOG_ERROR("commProvider_ StartWatchDeviceChange Fail.");
68 return DBStatus::DB_ERROR;
69 }
70 } else {
71 Status errCode = CommunicationProvider::GetInstance().StopWatchDeviceChange(this, pi);
72 if (errCode != Status::SUCCESS) {
73 LOG_ERROR("commProvider_ StopWatchDeviceChange Fail.");
74 return DBStatus::DB_ERROR;
75 }
76 }
77
78 return DBStatus::OK;
79 }
80
RegOnDataReceive(const OnDataReceive & callback)81 DBStatus ProcessCommunicatorImpl::RegOnDataReceive(const OnDataReceive &callback)
82 {
83 {
84 std::lock_guard<std::mutex> onDataReceiveLockGard(onDataReceiveMutex_);
85 onDataReceiveHandler_ = callback;
86 }
87
88 PipeInfo pi = { thisProcessLabel_ };
89 if (callback) {
90 Status errCode = CommunicationProvider::GetInstance().StartWatchDataChange(this, pi);
91 if (errCode != Status::SUCCESS) {
92 LOG_ERROR("commProvider_ StartWatchDataChange Fail.");
93 return DBStatus::DB_ERROR;
94 }
95 } else {
96 Status errCode = CommunicationProvider::GetInstance().StopWatchDataChange(this, pi);
97 if (errCode != Status::SUCCESS) {
98 LOG_ERROR("commProvider_ StopWatchDataChange Fail.");
99 return DBStatus::DB_ERROR;
100 }
101 }
102 return DBStatus::OK;
103 }
104
SendData(const DeviceInfos & dstDevInfo,const uint8_t * data,uint32_t length)105 DBStatus ProcessCommunicatorImpl::SendData(const DeviceInfos &dstDevInfo, const uint8_t *data, uint32_t length)
106 {
107 PipeInfo pi = { thisProcessLabel_ };
108 DeviceId destination;
109 destination.deviceId = dstDevInfo.identifier;
110 Status errCode = CommunicationProvider::GetInstance().SendData(pi, destination, data, static_cast<int>(length));
111 if (errCode != Status::SUCCESS) {
112 LOG_ERROR("commProvider_ SendData Fail.");
113 return DBStatus::DB_ERROR;
114 }
115
116 return DBStatus::OK;
117 }
118
GetMtuSize()119 uint32_t ProcessCommunicatorImpl::GetMtuSize()
120 {
121 return MTU_SIZE;
122 }
123
GetMtuSize(const DeviceInfos & devInfo)124 uint32_t ProcessCommunicatorImpl::GetMtuSize(const DeviceInfos &devInfo)
125 {
126 LOG_INFO("GetMtuSize start");
127 std::vector<DeviceInfo> devInfos = CommunicationProvider::GetInstance().GetDeviceList();
128 for (auto const &entry : devInfos) {
129 LOG_INFO("GetMtuSize deviceType: %{public}s", entry.deviceType.c_str());
130 bool isWatch = (entry.deviceType == SMART_WATCH_TYPE || entry.deviceType == CHILDREN_WATCH_TYPE);
131 if (entry.deviceId == devInfo.identifier && isWatch) {
132 return MTU_SIZE_WATCH;
133 }
134 }
135 return MTU_SIZE;
136 }
137
GetLocalDeviceInfos()138 DeviceInfos ProcessCommunicatorImpl::GetLocalDeviceInfos()
139 {
140 DeviceInfos localDevInfos;
141 DeviceInfo devInfo = CommunicationProvider::GetInstance().GetLocalDevice();
142 localDevInfos.identifier = devInfo.deviceId;
143 return localDevInfos;
144 }
145
GetRemoteOnlineDeviceInfosList()146 std::vector<DeviceInfos> ProcessCommunicatorImpl::GetRemoteOnlineDeviceInfosList()
147 {
148 std::vector<DeviceInfos> remoteDevInfos;
149 std::vector<DeviceInfo> devInfoVec = CommunicationProvider::GetInstance().GetDeviceList();
150 for (auto const &entry : devInfoVec) {
151 DeviceInfos remoteDev;
152 remoteDev.identifier = entry.deviceId;
153 remoteDevInfos.push_back(remoteDev);
154 }
155 return remoteDevInfos;
156 }
157
IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos & peerDevInfo)158 bool ProcessCommunicatorImpl::IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo)
159 {
160 PipeInfo pi = { thisProcessLabel_ };
161 DeviceId di = { peerDevInfo.identifier };
162 return CommunicationProvider::GetInstance().IsSameStartedOnPeer(pi, di);
163 }
164
OnMessage(const DeviceInfo & info,const uint8_t * ptr,const int size,const PipeInfo & pipeInfo) const165 void ProcessCommunicatorImpl::OnMessage(
166 const DeviceInfo &info, const uint8_t *ptr, const int size, __attribute__((unused)) const PipeInfo &pipeInfo) const
167 {
168 std::lock_guard<std::mutex> onDataReceiveLockGuard(onDataReceiveMutex_);
169 if (onDataReceiveHandler_ == nullptr) {
170 LOG_ERROR("onDataReceiveHandler_ invalid.");
171 return;
172 }
173 DeviceInfos devInfo;
174 devInfo.identifier = info.deviceId;
175 onDataReceiveHandler_(devInfo, ptr, static_cast<uint32_t>(size));
176 }
177
OnDeviceChanged(const DeviceInfo & info,const DeviceChangeType & type) const178 void ProcessCommunicatorImpl::OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const
179 {
180 std::lock_guard<std::mutex> onDeviceChangeLockGuard(onDeviceChangeMutex_);
181 if (onDeviceChangeHandler_ == nullptr) {
182 LOG_ERROR("onDeviceChangeHandler_ invalid.");
183 return;
184 }
185 DeviceInfos devInfo;
186 devInfo.identifier = info.deviceId;
187 onDeviceChangeHandler_(devInfo, (type == DeviceChangeType::DEVICE_ONLINE));
188 }
189 } // namespace ObjectStore
190 } // namespace OHOS
191