1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "SoftBusClient"
17 #include "softbus_client.h"
18
19 #include "communicator_context.h"
20 #include "communication/connect_manager.h"
21 #include "device_manager_adapter.h"
22 #include "inner_socket.h"
23 #include "kvstore_utils.h"
24 #include "log_print.h"
25 #include "softbus_error_code.h"
26
27 namespace OHOS::AppDistributedKv {
28 using namespace OHOS::DistributedKv;
29 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
30 using Context = DistributedData::CommunicatorContext;
SoftBusClient(const PipeInfo & pipeInfo,const DeviceId & deviceId,const std::string & networkId,uint32_t type)31 SoftBusClient::SoftBusClient(const PipeInfo& pipeInfo, const DeviceId& deviceId, const std::string& networkId,
32 uint32_t type) : type_(type), pipe_(pipeInfo), device_(deviceId), networkId_(networkId)
33 {
34 mtu_ = DEFAULT_MTU_SIZE;
35 }
36
~SoftBusClient()37 SoftBusClient::~SoftBusClient()
38 {
39 ZLOGI("Shutdown socket:%{public}d, deviceId:%{public}s", socket_,
40 KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str());
41 if (socket_ > 0) {
42 Shutdown(socket_);
43 }
44 }
45
operator ==(int32_t socket) const46 bool SoftBusClient::operator==(int32_t socket) const
47 {
48 return socket_ == socket;
49 }
50
operator ==(const std::string & deviceId) const51 bool SoftBusClient::operator==(const std::string &deviceId) const
52 {
53 return device_.deviceId == deviceId;
54 }
55
GetMtuSize() const56 uint32_t SoftBusClient::GetMtuSize() const
57 {
58 ZLOGD("get mtu size socket:%{public}d mtu:%{public}d", socket_, mtu_);
59 return mtu_;
60 }
61
GetTimeout() const62 uint32_t SoftBusClient::GetTimeout() const
63 {
64 return DEFAULT_TIMEOUT;
65 }
66
SendData(const DataInfo & dataInfo,const ISocketListener * listener)67 Status SoftBusClient::SendData(const DataInfo &dataInfo, const ISocketListener *listener)
68 {
69 std::lock_guard<std::mutex> lock(mutex_);
70 auto result = CheckStatus();
71 if (result != Status::SUCCESS) {
72 return result;
73 }
74 ZLOGD("send data socket:%{public}d, data size:%{public}u.", socket_, dataInfo.length);
75 int32_t ret = SendBytes(socket_, dataInfo.data, dataInfo.length);
76 if (ret != SOFTBUS_OK) {
77 expireTime_ = std::chrono::steady_clock::now();
78 ZLOGE("send data to socket%{public}d failed, ret:%{public}d.", socket_, ret);
79 softBusError_ = ret;
80 return Status::ERROR;
81 }
82 softBusError_ = 0;
83 expireTime_ = CalcExpireTime();
84 return Status::SUCCESS;
85 }
86
GetSoftBusError()87 int32_t SoftBusClient::GetSoftBusError()
88 {
89 std::lock_guard<std::mutex> lock(mutex_);
90 return softBusError_;
91 }
92
OpenConnect(const ISocketListener * listener)93 Status SoftBusClient::OpenConnect(const ISocketListener *listener)
94 {
95 std::lock_guard<std::mutex> lock(mutex_);
96 auto status = CheckStatus();
97 if (status == Status::SUCCESS || status == Status::RATE_LIMIT) {
98 return status;
99 }
100 if (isOpening_.exchange(true)) {
101 return Status::RATE_LIMIT;
102 }
103 int32_t clientSocket = CreateSocket();
104 if (clientSocket <= 0) {
105 isOpening_.store(false);
106 return Status::NETWORK_ERROR;
107 }
108 auto task = [type = type_, clientSocket, listener, client = shared_from_this()]() {
109 if (client == nullptr) {
110 ZLOGE("OpenSessionByAsync client is nullptr.");
111 return;
112 }
113 ZLOGI("Bind Start, device:%{public}s socket:%{public}d type:%{public}u",
114 KvStoreUtils::ToBeAnonymous(client->device_.deviceId).c_str(), clientSocket, type);
115 int32_t status = client->Open(clientSocket, type, listener);
116 Context::GetInstance().NotifySessionReady(client->device_.deviceId, status);
117 client->isOpening_.store(false);
118 };
119 Context::GetInstance().GetThreadPool()->Execute(task);
120 return Status::RATE_LIMIT;
121 }
122
CreateSocket() const123 int32_t SoftBusClient::CreateSocket() const
124 {
125 SocketInfo socketInfo;
126 std::string peerName = pipe_.pipeId;
127 socketInfo.peerName = const_cast<char *>(peerName.c_str());
128 socketInfo.peerNetworkId = const_cast<char *>(networkId_.c_str());
129 std::string clientName = pipe_.pipeId;
130 socketInfo.name = const_cast<char *>(clientName.c_str());
131 std::string pkgName = "ohos.distributeddata";
132 socketInfo.pkgName = pkgName.data();
133 socketInfo.dataType = DATA_TYPE_BYTES;
134 int32_t socket = Socket(socketInfo);
135 if (socket <= 0) {
136 ZLOGE("Create the client Socket:%{public}d failed, peerName:%{public}s", socket, socketInfo.peerName);
137 }
138 return socket;
139 }
140
CheckStatus()141 Status SoftBusClient::CheckStatus()
142 {
143 if (bindState_ == 0) {
144 return Status::SUCCESS;
145 }
146 if (isOpening_.load()) {
147 return Status::RATE_LIMIT;
148 }
149 if (bindState_ == 0) {
150 return Status::SUCCESS;
151 }
152 return Status::ERROR;
153 }
154
Open(int32_t socket,uint32_t type,const ISocketListener * listener,bool async)155 int32_t SoftBusClient::Open(int32_t socket, uint32_t type, const ISocketListener *listener, bool async)
156 {
157 int32_t status = ::Bind(socket, QOS_INFOS[type % QOS_BUTT], QOS_COUNTS[type % QOS_BUTT], listener);
158 ZLOGI("Bind %{public}s,session:%{public}s,socketId:%{public}d",
159 KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), socket);
160
161 if (status != 0) {
162 ZLOGE("[Bind] device:%{public}s socket failed, session:%{public}s,result:%{public}d",
163 KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), status);
164 ::Shutdown(socket);
165 return status;
166 }
167 UpdateExpireTime(async);
168 uint32_t mtu = 0;
169 std::tie(status, mtu) = GetMtu(socket);
170 if (status != SOFTBUS_OK) {
171 ZLOGE("GetMtu failed, session:%{public}s, socket:%{public}d, status:%{public}d", pipe_.pipeId.c_str(), socket_,
172 status);
173 ::Shutdown(socket);
174 return status;
175 }
176 UpdateBindInfo(socket, mtu, status, async);
177 ZLOGI("open %{public}s, session:%{public}s success, socket:%{public}d",
178 KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), socket_);
179 ConnectManager::GetInstance()->OnSessionOpen(networkId_);
180 return status;
181 }
182
GetExpireTime() const183 SoftBusClient::Time SoftBusClient::GetExpireTime() const
184 {
185 std::lock_guard<std::mutex> lock(mutex_);
186 return expireTime_;
187 }
188
GetSocket() const189 int32_t SoftBusClient::GetSocket() const
190 {
191 return socket_;
192 }
193
UpdateExpireTime(bool async)194 void SoftBusClient::UpdateExpireTime(bool async)
195 {
196 auto expireTime = CalcExpireTime();
197 if (async) {
198 std::lock_guard<std::mutex> lock(mutex_);
199 if (expireTime > expireTime_) {
200 expireTime_ = expireTime;
201 }
202 } else {
203 if (expireTime > expireTime_) {
204 expireTime_ = expireTime;
205 }
206 }
207 }
208
UpdateBindInfo(int32_t socket,uint32_t mtu,int32_t status,bool async)209 void SoftBusClient::UpdateBindInfo(int32_t socket, uint32_t mtu, int32_t status, bool async)
210 {
211 if (async) {
212 std::lock_guard<std::mutex> lock(mutex_);
213 socket_ = socket;
214 mtu_ = mtu;
215 bindState_ = status;
216 } else {
217 socket_ = socket;
218 mtu_ = mtu;
219 bindState_ = status;
220 }
221 }
222
GetMtu(int32_t socket)223 std::pair<int32_t, uint32_t> SoftBusClient::GetMtu(int32_t socket)
224 {
225 uint32_t mtu = 0;
226 auto ret = ::GetMtuSize(socket, &mtu);
227 return { ret, mtu };
228 }
229
GetQoSType() const230 uint32_t SoftBusClient::GetQoSType() const
231 {
232 return type_ % QOS_BUTT;
233 }
234
CalcExpireTime() const235 SoftBusClient::Time SoftBusClient::CalcExpireTime() const
236 {
237 auto delay = type_ == QOS_BR ? BR_CLOSE_DELAY : HML_CLOSE_DELAY;
238 return std::chrono::steady_clock::now() + delay;
239 }
240
ReuseConnect(const ISocketListener * listener)241 Status SoftBusClient::ReuseConnect(const ISocketListener *listener)
242 {
243 std::lock_guard<std::mutex> lock(mutex_);
244 auto checkStatus = CheckStatus();
245 if (checkStatus == Status::SUCCESS) {
246 UpdateExpireTime(false);
247 return Status::SUCCESS;
248 }
249 int32_t socket = CreateSocket();
250 if (socket <= 0) {
251 return Status::NETWORK_ERROR;
252 }
253 ZLOGI("Reuse Start, device:%{public}s session:%{public}s socket:%{public}d",
254 KvStoreUtils::ToBeAnonymous(device_.deviceId).c_str(), pipe_.pipeId.c_str(), socket);
255 int32_t status = Open(socket, QOS_REUSE, listener, false);
256 return status == SOFTBUS_OK ? Status::SUCCESS : Status::NETWORK_ERROR;
257 }
258
GetNetworkId() const259 const std::string& SoftBusClient::GetNetworkId() const
260 {
261 return networkId_;
262 }
263 } // namespace OHOS::AppDistributedKv