1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "soft_bus_channel.h"
16
17 #include <securec.h>
18
19 #include "constant_common.h"
20 #include "device_info_manager.h"
21 #ifdef EVENTHANDLER_ENABLE
22 #include "access_event_handler.h"
23 #endif
24 #include "token_sync_manager_service.h"
25 #include "singleton.h"
26 #include "soft_bus_manager.h"
27
28 namespace OHOS {
29 namespace Security {
30 namespace AccessToken {
31 namespace {
32 static const std::string REQUEST_TYPE = "request";
33 static const std::string RESPONSE_TYPE = "response";
34 static const std::string TASK_NAME_CLOSE_SESSION = "atm_soft_bus_channel_close_session";
35 static const int32_t EXECUTE_COMMAND_TIME_OUT = 3000;
36 static const int32_t WAIT_SESSION_CLOSE_MILLISECONDS = 5 * 1000;
37 // send buf size for header
38 static const int RPC_TRANSFER_HEAD_BYTES_LENGTH = 1024 * 256;
39 // decompress buf size
40 static const int RPC_TRANSFER_BYTES_MAX_LENGTH = 1024 * 1024;
41 } // namespace
SoftBusChannel(const std::string & deviceId)42 SoftBusChannel::SoftBusChannel(const std::string &deviceId)
43 : deviceId_(deviceId), mutex_(), callbacks_(), responseResult_(""), loadedCond_()
44 {
45 LOGD(ATM_DOMAIN, ATM_TAG, "SoftBusChannel(deviceId)");
46 isDelayClosing_ = false;
47 socketFd_ = Constant::INVALID_SOCKET_FD;
48 isSocketUsing_ = false;
49 }
50
~SoftBusChannel()51 SoftBusChannel::~SoftBusChannel()
52 {
53 LOGD(ATM_DOMAIN, ATM_TAG, "~SoftBusChannel()");
54 }
55
BuildConnection()56 int SoftBusChannel::BuildConnection()
57 {
58 CancelCloseConnectionIfNeeded();
59
60 std::unique_lock<std::mutex> lock(socketMutex_);
61 if (socketFd_ != Constant::INVALID_SOCKET_FD) {
62 LOGI(ATM_DOMAIN, ATM_TAG, "Socket is exist, no need open again.");
63 return Constant::SUCCESS;
64 }
65
66 if (socketFd_ == Constant::INVALID_SOCKET_FD) {
67 LOGI(ATM_DOMAIN, ATM_TAG, "Bind service with device: %{public}s",
68 ConstantCommon::EncryptDevId(deviceId_).c_str());
69 int socket = SoftBusManager::GetInstance().BindService(deviceId_);
70 if (socket == Constant::INVALID_SOCKET_FD) {
71 LOGE(ATM_DOMAIN, ATM_TAG, "Bind service failed.");
72 return Constant::FAILURE;
73 }
74 socketFd_ = socket;
75 }
76 return Constant::SUCCESS;
77 }
78
CloseConnection()79 void SoftBusChannel::CloseConnection()
80 {
81 LOGD(ATM_DOMAIN, ATM_TAG, "Close connection");
82 std::unique_lock<std::mutex> lock(mutex_);
83 if (isDelayClosing_) {
84 return;
85 }
86
87 #ifdef EVENTHANDLER_ENABLE
88 std::shared_ptr<AccessEventHandler> handler =
89 DelayedSingleton<TokenSyncManagerService>::GetInstance()->GetSendEventHandler();
90 if (handler == nullptr) {
91 LOGE(ATM_DOMAIN, ATM_TAG, "Fail to get EventHandler");
92 return;
93 }
94 #endif
95 std::weak_ptr<SoftBusChannel> weakPtr = shared_from_this();
96 std::function<void()> delayed = ([weakPtr]() {
97 auto self = weakPtr.lock();
98 if (self == nullptr) {
99 LOGE(ATM_DOMAIN, ATM_TAG, "SoftBusChannel is nullptr");
100 return;
101 }
102 std::unique_lock<std::mutex> lock(self->socketMutex_);
103 if (self->isSocketUsing_) {
104 LOGD(ATM_DOMAIN, ATM_TAG, "Socket is in using, cancel close socket");
105 } else {
106 SoftBusManager::GetInstance().CloseSocket(self->socketFd_);
107 self->socketFd_ = Constant::INVALID_SESSION;
108 LOGI(ATM_DOMAIN, ATM_TAG, "Close socket for device: %{public}s",
109 ConstantCommon::EncryptDevId(self->deviceId_).c_str());
110 }
111 self->isDelayClosing_ = false;
112 });
113
114 LOGD(ATM_DOMAIN, ATM_TAG, "Close socket after %{public}d ms", WAIT_SESSION_CLOSE_MILLISECONDS);
115 #ifdef EVENTHANDLER_ENABLE
116 handler->ProxyPostTask(delayed, TASK_NAME_CLOSE_SESSION, WAIT_SESSION_CLOSE_MILLISECONDS);
117 #endif
118
119 isDelayClosing_ = true;
120 }
121
Release()122 void SoftBusChannel::Release()
123 {
124 #ifdef EVENTHANDLER_ENABLE
125 std::shared_ptr<AccessEventHandler> handler =
126 DelayedSingleton<TokenSyncManagerService>::GetInstance()->GetSendEventHandler();
127 if (handler == nullptr) {
128 LOGE(ATM_DOMAIN, ATM_TAG, "Fail to get EventHandler");
129 return;
130 }
131 handler->ProxyRemoveTask(TASK_NAME_CLOSE_SESSION);
132 #endif
133 }
134
GetUuid()135 std::string SoftBusChannel::GetUuid()
136 {
137 // to use a lib like libuuid
138 int uuidStrLen = 37; // 32+4+1
139 char uuidbuf[uuidStrLen];
140 RandomUuid(uuidbuf, uuidStrLen);
141 std::string uuid(uuidbuf);
142 LOGD(ATM_DOMAIN, ATM_TAG, "Generated message uuid: %{public}s", ConstantCommon::EncryptDevId(uuid).c_str());
143
144 return uuid;
145 }
146
InsertCallback(int result,std::string & uuid)147 void SoftBusChannel::InsertCallback(int result, std::string &uuid)
148 {
149 std::unique_lock<std::mutex> lock(socketMutex_);
150 std::function<void(const std::string &)> callback = [this](const std::string &result) {
151 responseResult_ = std::string(result);
152 loadedCond_.notify_all();
153 LOGD(ATM_DOMAIN, ATM_TAG, "OnResponse called end");
154 };
155 callbacks_.insert(std::pair<std::string, std::function<void(std::string)>>(uuid, callback));
156
157 isSocketUsing_ = true;
158 lock.unlock();
159 }
160
ExecuteCommand(const std::string & commandName,const std::string & jsonPayload)161 std::string SoftBusChannel::ExecuteCommand(const std::string &commandName, const std::string &jsonPayload)
162 {
163 if (commandName.empty() || jsonPayload.empty()) {
164 LOGE(ATM_DOMAIN, ATM_TAG, "Invalid params, commandName: %{public}s", commandName.c_str());
165 return "";
166 }
167
168 std::string uuid = GetUuid();
169
170 int len = static_cast<int32_t>(RPC_TRANSFER_HEAD_BYTES_LENGTH + jsonPayload.length());
171 unsigned char *buf = new (std::nothrow) unsigned char[len + 1];
172 if (buf == nullptr) {
173 LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory: %{public}d", len);
174 return "";
175 }
176 (void)memset_s(buf, len + 1, 0, len + 1);
177 BytesInfo info;
178 info.bytes = buf;
179 info.bytesLength = len;
180 int result = PrepareBytes(REQUEST_TYPE, uuid, commandName, jsonPayload, info);
181 if (result != Constant::SUCCESS) {
182 delete[] buf;
183 return "";
184 }
185 InsertCallback(result, uuid);
186 int retCode = SendRequestBytes(buf, info.bytesLength);
187 delete[] buf;
188
189 std::unique_lock<std::mutex> lock2(socketMutex_);
190 if (retCode != Constant::SUCCESS) {
191 LOGE(ATM_DOMAIN, ATM_TAG, "Send request data failed: %{public}d ", retCode);
192 callbacks_.erase(uuid);
193 isSocketUsing_ = false;
194 return "";
195 }
196
197 LOGD(ATM_DOMAIN, ATM_TAG, "Wait command response");
198 if (loadedCond_.wait_for(lock2, std::chrono::milliseconds(EXECUTE_COMMAND_TIME_OUT)) == std::cv_status::timeout) {
199 LOGW(ATM_DOMAIN, ATM_TAG, "Time out to wait response.");
200 callbacks_.erase(uuid);
201 isSocketUsing_ = false;
202 return "";
203 }
204
205 isSocketUsing_ = false;
206 return responseResult_;
207 }
208
HandleDataReceived(int socket,const unsigned char * bytes,int length)209 void SoftBusChannel::HandleDataReceived(int socket, const unsigned char *bytes, int length)
210 {
211 LOGD(ATM_DOMAIN, ATM_TAG, "HandleDataReceived");
212 #ifdef DEBUG_API_PERFORMANCE
213 LOGI(ATM_DOMAIN, ATM_TAG, "Api_performance:recieve message from softbus");
214 #endif
215 if (socket <= 0 || length <= 0) {
216 LOGE(ATM_DOMAIN, ATM_TAG, "Invalid params: socket: %{public}d, data length: %{public}d", socket, length);
217 return;
218 }
219 std::string receiveData = Decompress(bytes, length);
220 if (receiveData.empty()) {
221 LOGE(ATM_DOMAIN, ATM_TAG, "Invalid parameter bytes");
222 return;
223 }
224 std::shared_ptr<SoftBusMessage> message = SoftBusMessage::FromJson(receiveData);
225 if (message == nullptr) {
226 LOGD(ATM_DOMAIN, ATM_TAG, "Invalid json string");
227 return;
228 }
229 if (!message->IsValid()) {
230 LOGD(ATM_DOMAIN, ATM_TAG, "Invalid data, has empty field");
231 return;
232 }
233
234 std::string type = message->GetType();
235 if (REQUEST_TYPE == (type)) {
236 std::function<void()> delayed = ([weak = weak_from_this(), socket, message]() {
237 auto self = weak.lock();
238 if (self == nullptr) {
239 LOGE(ATM_DOMAIN, ATM_TAG, "SoftBusChannel is nullptr");
240 return;
241 }
242 self->HandleRequest(socket, message->GetId(), message->GetCommandName(), message->GetJsonPayload());
243 });
244
245 #ifdef EVENTHANDLER_ENABLE
246 std::shared_ptr<AccessEventHandler> handler =
247 DelayedSingleton<TokenSyncManagerService>::GetInstance()->GetRecvEventHandler();
248 if (handler == nullptr) {
249 LOGE(ATM_DOMAIN, ATM_TAG, "Fail to get EventHandler");
250 return;
251 }
252 handler->ProxyPostTask(delayed, "HandleDataReceived_HandleRequest");
253 #endif
254 } else if (RESPONSE_TYPE == (type)) {
255 HandleResponse(message->GetId(), message->GetJsonPayload());
256 } else {
257 LOGE(ATM_DOMAIN, ATM_TAG, "Invalid type: %{public}s ", type.c_str());
258 }
259 }
260
PrepareBytes(const std::string & type,const std::string & id,const std::string & commandName,const std::string & jsonPayload,BytesInfo & info)261 int SoftBusChannel::PrepareBytes(const std::string &type, const std::string &id, const std::string &commandName,
262 const std::string &jsonPayload, BytesInfo &info)
263 {
264 SoftBusMessage messageEntity(type, id, commandName, jsonPayload);
265 std::string json = messageEntity.ToJson();
266 return Compress(json, info.bytes, info.bytesLength);
267 }
268
Compress(const std::string & json,const unsigned char * compressedBytes,int & compressedLength)269 int SoftBusChannel::Compress(const std::string &json, const unsigned char *compressedBytes, int &compressedLength)
270 {
271 uLong len = compressBound(json.size());
272 // length will not so that long
273 if (compressedLength > 0 && static_cast<int32_t>(len) > compressedLength) {
274 LOGE(ATM_DOMAIN, ATM_TAG,
275 "compress error. data length overflow, bound length: %{public}d, buffer length: %{public}d",
276 static_cast<int32_t>(len), compressedLength);
277 return Constant::FAILURE;
278 }
279
280 int result = compress(const_cast<Byte *>(compressedBytes), &len,
281 reinterpret_cast<unsigned char *>(const_cast<char *>(json.c_str())), json.size() + 1);
282 if (result != Z_OK) {
283 LOGE(ATM_DOMAIN, ATM_TAG, "Compress failed! error code: %{public}d", result);
284 return result;
285 }
286 LOGD(ATM_DOMAIN, ATM_TAG, "Compress complete. compress %{public}d bytes to %{public}d", compressedLength,
287 static_cast<int32_t>(len));
288 compressedLength = static_cast<int32_t>(len);
289 return Constant::SUCCESS;
290 }
291
Decompress(const unsigned char * bytes,const int length)292 std::string SoftBusChannel::Decompress(const unsigned char *bytes, const int length)
293 {
294 LOGD(ATM_DOMAIN, ATM_TAG, "Input length: %{public}d", length);
295 uLong len = RPC_TRANSFER_BYTES_MAX_LENGTH;
296 unsigned char *buf = new (std::nothrow) unsigned char[len + 1];
297 if (buf == nullptr) {
298 LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory!");
299 return "";
300 }
301 (void)memset_s(buf, len + 1, 0, len + 1);
302 int result = uncompress(buf, &len, const_cast<unsigned char *>(bytes), length);
303 if (result != Z_OK) {
304 LOGE(ATM_DOMAIN, ATM_TAG,
305 "uncompress failed, error code: %{public}d, bound length: %{public}d, buffer length: %{public}d", result,
306 static_cast<int32_t>(len), length);
307 delete[] buf;
308 return "";
309 }
310 buf[len] = '\0';
311 std::string str(reinterpret_cast<char *>(buf));
312 delete[] buf;
313 return str;
314 }
315
SendRequestBytes(const unsigned char * bytes,const int bytesLength)316 int SoftBusChannel::SendRequestBytes(const unsigned char *bytes, const int bytesLength)
317 {
318 if (bytesLength == 0) {
319 LOGE(ATM_DOMAIN, ATM_TAG, "Bytes data is invalid.");
320 return Constant::FAILURE;
321 }
322
323 std::unique_lock<std::mutex> lock(socketMutex_);
324 if (CheckSessionMayReopenLocked() != Constant::SUCCESS) {
325 LOGE(ATM_DOMAIN, ATM_TAG, "Socket invalid and reopen failed!");
326 return Constant::FAILURE;
327 }
328
329 LOGD(ATM_DOMAIN, ATM_TAG, "Send len (after compress len)= %{public}d", bytesLength);
330 #ifdef DEBUG_API_PERFORMANCE
331 LOGI(ATM_DOMAIN, ATM_TAG, "Api_performance:send command to softbus");
332 #endif
333 int result = ::SendBytes(socketFd_, bytes, bytesLength);
334 if (result != Constant::SUCCESS) {
335 LOGE(ATM_DOMAIN, ATM_TAG, "Fail to send! result= %{public}d", result);
336 return Constant::FAILURE;
337 }
338 LOGD(ATM_DOMAIN, ATM_TAG, "Send successfully.");
339 return Constant::SUCCESS;
340 }
341
CheckSessionMayReopenLocked()342 int SoftBusChannel::CheckSessionMayReopenLocked()
343 {
344 // when socket is opened, we got a valid sessionid, when socket closed, we will reset sessionid.
345 if (IsSessionAvailable()) {
346 return Constant::SUCCESS;
347 }
348 int socket = SoftBusManager::GetInstance().BindService(deviceId_);
349 if (socket != Constant::INVALID_SESSION) {
350 socketFd_ = socket;
351 return Constant::SUCCESS;
352 }
353 return Constant::FAILURE;
354 }
355
IsSessionAvailable()356 bool SoftBusChannel::IsSessionAvailable()
357 {
358 return socketFd_ > Constant::INVALID_SESSION;
359 }
360
CancelCloseConnectionIfNeeded()361 void SoftBusChannel::CancelCloseConnectionIfNeeded()
362 {
363 std::unique_lock<std::mutex> lock(mutex_);
364 if (!isDelayClosing_) {
365 return;
366 }
367 LOGD(ATM_DOMAIN, ATM_TAG, "Cancel close connection");
368
369 Release();
370 isDelayClosing_ = false;
371 }
372
HandleRequest(int socket,const std::string & id,const std::string & commandName,const std::string & jsonPayload)373 void SoftBusChannel::HandleRequest(int socket, const std::string &id, const std::string &commandName,
374 const std::string &jsonPayload)
375 {
376 std::shared_ptr<BaseRemoteCommand> command =
377 RemoteCommandFactory::GetInstance().NewRemoteCommandFromJson(commandName, jsonPayload);
378 if (command == nullptr) {
379 // send result back directly
380 LOGW(ATM_DOMAIN, ATM_TAG, "Command %{public}s cannot get from json", commandName.c_str());
381
382 int sendlen = static_cast<int32_t>(RPC_TRANSFER_HEAD_BYTES_LENGTH + jsonPayload.length());
383 unsigned char *sendbuf = new (std::nothrow) unsigned char[sendlen + 1];
384 if (sendbuf == nullptr) {
385 LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory: %{public}d", sendlen);
386 return;
387 }
388 (void)memset_s(sendbuf, sendlen + 1, 0, sendlen + 1);
389 BytesInfo info;
390 info.bytes = sendbuf;
391 info.bytesLength = sendlen;
392 int sendResult = PrepareBytes(RESPONSE_TYPE, id, commandName, jsonPayload, info);
393 if (sendResult != Constant::SUCCESS) {
394 delete[] sendbuf;
395 return;
396 }
397 int sendResultCode = SendResponseBytes(socket, sendbuf, info.bytesLength);
398 delete[] sendbuf;
399 LOGD(ATM_DOMAIN, ATM_TAG, "Send response result= %{public}d ", sendResultCode);
400 return;
401 }
402
403 // execute command
404 command->Execute();
405 LOGD(ATM_DOMAIN, ATM_TAG, "Command uniqueId: %{public}s, finish with status: %{public}d, message: %{public}s",
406 ConstantCommon::EncryptDevId(command->remoteProtocol_.uniqueId).c_str(), command->remoteProtocol_.statusCode,
407 command->remoteProtocol_.message.c_str());
408
409 // send result back
410 std::string resultJsonPayload = command->ToJsonPayload();
411 int len = static_cast<int32_t>(RPC_TRANSFER_HEAD_BYTES_LENGTH + resultJsonPayload.length());
412 unsigned char *buf = new (std::nothrow) unsigned char[len + 1];
413 if (buf == nullptr) {
414 LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory: %{public}d", len);
415 return;
416 }
417 (void)memset_s(buf, len + 1, 0, len + 1);
418 BytesInfo info;
419 info.bytes = buf;
420 info.bytesLength = len;
421 int result = PrepareBytes(RESPONSE_TYPE, id, commandName, resultJsonPayload, info);
422 if (result != Constant::SUCCESS) {
423 delete[] buf;
424 return;
425 }
426 int retCode = SendResponseBytes(socket, buf, info.bytesLength);
427 delete[] buf;
428 LOGD(ATM_DOMAIN, ATM_TAG, "Send response result= %{public}d", retCode);
429 }
430
HandleResponse(const std::string & id,const std::string & jsonPayload)431 void SoftBusChannel::HandleResponse(const std::string &id, const std::string &jsonPayload)
432 {
433 std::unique_lock<std::mutex> lock(socketMutex_);
434 auto callback = callbacks_.find(id);
435 if (callback != callbacks_.end()) {
436 (callback->second)(jsonPayload);
437 callbacks_.erase(callback);
438 }
439 }
440
SendResponseBytes(int socket,const unsigned char * bytes,const int bytesLength)441 int SoftBusChannel::SendResponseBytes(int socket, const unsigned char *bytes, const int bytesLength)
442 {
443 LOGD(ATM_DOMAIN, ATM_TAG, "Send len (after compress len)= %{public}d", bytesLength);
444 int result = ::SendBytes(socket, bytes, bytesLength);
445 if (result != Constant::SUCCESS) {
446 LOGE(ATM_DOMAIN, ATM_TAG, "Fail to send! result= %{public}d", result);
447 return Constant::FAILURE;
448 }
449 LOGD(ATM_DOMAIN, ATM_TAG, "Send successfully.");
450 return Constant::SUCCESS;
451 }
452
FromJson(const std::string & jsonString)453 std::shared_ptr<SoftBusMessage> SoftBusMessage::FromJson(const std::string &jsonString)
454 {
455 CJsonUnique json = CreateJsonFromString(jsonString);
456 if (json == nullptr || cJSON_IsObject(json.get()) == false) {
457 LOGE(ATM_DOMAIN, ATM_TAG, "Failed to parse jsonString");
458 return nullptr;
459 }
460
461 std::string type;
462 std::string id;
463 std::string commandName;
464 std::string jsonPayload;
465 GetStringFromJson(json.get(), "type", type);
466 GetStringFromJson(json.get(), "id", id);
467 GetStringFromJson(json.get(), "commandName", commandName);
468 GetStringFromJson(json.get(), "jsonPayload", jsonPayload);
469 std::shared_ptr<SoftBusMessage> message = std::make_shared<SoftBusMessage>(type, id, commandName, jsonPayload);
470 return message;
471 }
472 } // namespace AccessToken
473 } // namespace Security
474 } // namespace OHOS
475