• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "soft_bus_channel.h"
16 
17 #include <securec.h>
18 
19 #include "constant_common.h"
20 #include "device_info_manager.h"
21 #ifdef EVENTHANDLER_ENABLE
22 #include "access_event_handler.h"
23 #endif
24 #include "token_sync_manager_service.h"
25 #include "singleton.h"
26 #include "soft_bus_manager.h"
27 
28 namespace OHOS {
29 namespace Security {
30 namespace AccessToken {
31 namespace {
32 static const std::string REQUEST_TYPE = "request";
33 static const std::string RESPONSE_TYPE = "response";
34 static const std::string TASK_NAME_CLOSE_SESSION = "atm_soft_bus_channel_close_session";
35 static const int32_t EXECUTE_COMMAND_TIME_OUT = 3000;
36 static const int32_t WAIT_SESSION_CLOSE_MILLISECONDS = 5 * 1000;
37 // send buf size for header
38 static const int RPC_TRANSFER_HEAD_BYTES_LENGTH = 1024 * 256;
39 // decompress buf size
40 static const int RPC_TRANSFER_BYTES_MAX_LENGTH = 1024 * 1024;
41 } // namespace
SoftBusChannel(const std::string & deviceId)42 SoftBusChannel::SoftBusChannel(const std::string &deviceId)
43     : deviceId_(deviceId), mutex_(), callbacks_(), responseResult_(""), loadedCond_()
44 {
45     LOGD(ATM_DOMAIN, ATM_TAG, "SoftBusChannel(deviceId)");
46     isDelayClosing_ = false;
47     socketFd_ = Constant::INVALID_SOCKET_FD;
48     isSocketUsing_ = false;
49 }
50 
~SoftBusChannel()51 SoftBusChannel::~SoftBusChannel()
52 {
53     LOGD(ATM_DOMAIN, ATM_TAG, "~SoftBusChannel()");
54 }
55 
BuildConnection()56 int SoftBusChannel::BuildConnection()
57 {
58     CancelCloseConnectionIfNeeded();
59 
60     std::unique_lock<std::mutex> lock(socketMutex_);
61     if (socketFd_ != Constant::INVALID_SOCKET_FD) {
62         LOGI(ATM_DOMAIN, ATM_TAG, "Socket is exist, no need open again.");
63         return Constant::SUCCESS;
64     }
65 
66     if (socketFd_ == Constant::INVALID_SOCKET_FD) {
67         LOGI(ATM_DOMAIN, ATM_TAG, "Bind service with device: %{public}s",
68             ConstantCommon::EncryptDevId(deviceId_).c_str());
69         int socket = SoftBusManager::GetInstance().BindService(deviceId_);
70         if (socket == Constant::INVALID_SOCKET_FD) {
71             LOGE(ATM_DOMAIN, ATM_TAG, "Bind service failed.");
72             return Constant::FAILURE;
73         }
74         socketFd_ = socket;
75     }
76     return Constant::SUCCESS;
77 }
78 
CloseConnection()79 void SoftBusChannel::CloseConnection()
80 {
81     LOGD(ATM_DOMAIN, ATM_TAG, "Close connection");
82     std::unique_lock<std::mutex> lock(mutex_);
83     if (isDelayClosing_) {
84         return;
85     }
86 
87 #ifdef EVENTHANDLER_ENABLE
88     std::shared_ptr<AccessEventHandler> handler =
89         DelayedSingleton<TokenSyncManagerService>::GetInstance()->GetSendEventHandler();
90     if (handler == nullptr) {
91         LOGE(ATM_DOMAIN, ATM_TAG, "Fail to get EventHandler");
92         return;
93     }
94 #endif
95     std::weak_ptr<SoftBusChannel> weakPtr = shared_from_this();
96     std::function<void()> delayed = ([weakPtr]() {
97         auto self = weakPtr.lock();
98         if (self == nullptr) {
99             LOGE(ATM_DOMAIN, ATM_TAG, "SoftBusChannel is nullptr");
100             return;
101         }
102         std::unique_lock<std::mutex> lock(self->socketMutex_);
103         if (self->isSocketUsing_) {
104             LOGD(ATM_DOMAIN, ATM_TAG, "Socket is in using, cancel close socket");
105         } else {
106             SoftBusManager::GetInstance().CloseSocket(self->socketFd_);
107             self->socketFd_ = Constant::INVALID_SESSION;
108             LOGI(ATM_DOMAIN, ATM_TAG, "Close socket for device: %{public}s",
109                 ConstantCommon::EncryptDevId(self->deviceId_).c_str());
110         }
111         self->isDelayClosing_ = false;
112     });
113 
114     LOGD(ATM_DOMAIN, ATM_TAG, "Close socket after %{public}d ms", WAIT_SESSION_CLOSE_MILLISECONDS);
115 #ifdef EVENTHANDLER_ENABLE
116     handler->ProxyPostTask(delayed, TASK_NAME_CLOSE_SESSION, WAIT_SESSION_CLOSE_MILLISECONDS);
117 #endif
118 
119     isDelayClosing_ = true;
120 }
121 
Release()122 void SoftBusChannel::Release()
123 {
124 #ifdef EVENTHANDLER_ENABLE
125     std::shared_ptr<AccessEventHandler> handler =
126         DelayedSingleton<TokenSyncManagerService>::GetInstance()->GetSendEventHandler();
127     if (handler == nullptr) {
128         LOGE(ATM_DOMAIN, ATM_TAG, "Fail to get EventHandler");
129         return;
130     }
131     handler->ProxyRemoveTask(TASK_NAME_CLOSE_SESSION);
132 #endif
133 }
134 
GetUuid()135 std::string SoftBusChannel::GetUuid()
136 {
137     // to use a lib like libuuid
138     int uuidStrLen = 37; // 32+4+1
139     char uuidbuf[uuidStrLen];
140     RandomUuid(uuidbuf, uuidStrLen);
141     std::string uuid(uuidbuf);
142     LOGD(ATM_DOMAIN, ATM_TAG, "Generated message uuid: %{public}s", ConstantCommon::EncryptDevId(uuid).c_str());
143 
144     return uuid;
145 }
146 
InsertCallback(int result,std::string & uuid)147 void SoftBusChannel::InsertCallback(int result, std::string &uuid)
148 {
149     std::unique_lock<std::mutex> lock(socketMutex_);
150     std::function<void(const std::string &)> callback = [this](const std::string &result) {
151         responseResult_ = std::string(result);
152         loadedCond_.notify_all();
153         LOGD(ATM_DOMAIN, ATM_TAG, "OnResponse called end");
154     };
155     callbacks_.insert(std::pair<std::string, std::function<void(std::string)>>(uuid, callback));
156 
157     isSocketUsing_ = true;
158     lock.unlock();
159 }
160 
ExecuteCommand(const std::string & commandName,const std::string & jsonPayload)161 std::string SoftBusChannel::ExecuteCommand(const std::string &commandName, const std::string &jsonPayload)
162 {
163     if (commandName.empty() || jsonPayload.empty()) {
164         LOGE(ATM_DOMAIN, ATM_TAG, "Invalid params, commandName: %{public}s", commandName.c_str());
165         return "";
166     }
167 
168     std::string uuid = GetUuid();
169 
170     int len = static_cast<int32_t>(RPC_TRANSFER_HEAD_BYTES_LENGTH + jsonPayload.length());
171     unsigned char *buf = new (std::nothrow) unsigned char[len + 1];
172     if (buf == nullptr) {
173         LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory: %{public}d", len);
174         return "";
175     }
176     (void)memset_s(buf, len + 1, 0, len + 1);
177     BytesInfo info;
178     info.bytes = buf;
179     info.bytesLength = len;
180     int result = PrepareBytes(REQUEST_TYPE, uuid, commandName, jsonPayload, info);
181     if (result != Constant::SUCCESS) {
182         delete[] buf;
183         return "";
184     }
185     InsertCallback(result, uuid);
186     int retCode = SendRequestBytes(buf, info.bytesLength);
187     delete[] buf;
188 
189     std::unique_lock<std::mutex> lock2(socketMutex_);
190     if (retCode != Constant::SUCCESS) {
191         LOGE(ATM_DOMAIN, ATM_TAG, "Send request data failed: %{public}d ", retCode);
192         callbacks_.erase(uuid);
193         isSocketUsing_ = false;
194         return "";
195     }
196 
197     LOGD(ATM_DOMAIN, ATM_TAG, "Wait command response");
198     if (loadedCond_.wait_for(lock2, std::chrono::milliseconds(EXECUTE_COMMAND_TIME_OUT)) == std::cv_status::timeout) {
199         LOGW(ATM_DOMAIN, ATM_TAG, "Time out to wait response.");
200         callbacks_.erase(uuid);
201         isSocketUsing_ = false;
202         return "";
203     }
204 
205     isSocketUsing_ = false;
206     return responseResult_;
207 }
208 
HandleDataReceived(int socket,const unsigned char * bytes,int length)209 void SoftBusChannel::HandleDataReceived(int socket, const unsigned char *bytes, int length)
210 {
211     LOGD(ATM_DOMAIN, ATM_TAG, "HandleDataReceived");
212 #ifdef DEBUG_API_PERFORMANCE
213     LOGI(ATM_DOMAIN, ATM_TAG, "Api_performance:recieve message from softbus");
214 #endif
215     if (socket <= 0 || length <= 0) {
216         LOGE(ATM_DOMAIN, ATM_TAG, "Invalid params: socket: %{public}d, data length: %{public}d", socket, length);
217         return;
218     }
219     std::string receiveData = Decompress(bytes, length);
220     if (receiveData.empty()) {
221         LOGE(ATM_DOMAIN, ATM_TAG, "Invalid parameter bytes");
222         return;
223     }
224     std::shared_ptr<SoftBusMessage> message = SoftBusMessage::FromJson(receiveData);
225     if (message == nullptr) {
226         LOGD(ATM_DOMAIN, ATM_TAG, "Invalid json string");
227         return;
228     }
229     if (!message->IsValid()) {
230         LOGD(ATM_DOMAIN, ATM_TAG, "Invalid data, has empty field");
231         return;
232     }
233 
234     std::string type = message->GetType();
235     if (REQUEST_TYPE == (type)) {
236         std::function<void()> delayed = ([weak = weak_from_this(), socket, message]() {
237             auto self = weak.lock();
238             if (self == nullptr) {
239                 LOGE(ATM_DOMAIN, ATM_TAG, "SoftBusChannel is nullptr");
240                 return;
241             }
242             self->HandleRequest(socket, message->GetId(), message->GetCommandName(), message->GetJsonPayload());
243         });
244 
245 #ifdef EVENTHANDLER_ENABLE
246         std::shared_ptr<AccessEventHandler> handler =
247             DelayedSingleton<TokenSyncManagerService>::GetInstance()->GetRecvEventHandler();
248         if (handler == nullptr) {
249             LOGE(ATM_DOMAIN, ATM_TAG, "Fail to get EventHandler");
250             return;
251         }
252         handler->ProxyPostTask(delayed, "HandleDataReceived_HandleRequest");
253 #endif
254     } else if (RESPONSE_TYPE == (type)) {
255         HandleResponse(message->GetId(), message->GetJsonPayload());
256     } else {
257         LOGE(ATM_DOMAIN, ATM_TAG, "Invalid type: %{public}s ", type.c_str());
258     }
259 }
260 
PrepareBytes(const std::string & type,const std::string & id,const std::string & commandName,const std::string & jsonPayload,BytesInfo & info)261 int SoftBusChannel::PrepareBytes(const std::string &type, const std::string &id, const std::string &commandName,
262     const std::string &jsonPayload, BytesInfo &info)
263 {
264     SoftBusMessage messageEntity(type, id, commandName, jsonPayload);
265     std::string json = messageEntity.ToJson();
266     return Compress(json, info.bytes, info.bytesLength);
267 }
268 
Compress(const std::string & json,const unsigned char * compressedBytes,int & compressedLength)269 int SoftBusChannel::Compress(const std::string &json, const unsigned char *compressedBytes, int &compressedLength)
270 {
271     uLong len = compressBound(json.size());
272     // length will not so that long
273     if (compressedLength > 0 && static_cast<int32_t>(len) > compressedLength) {
274         LOGE(ATM_DOMAIN, ATM_TAG,
275             "compress error. data length overflow, bound length: %{public}d, buffer length: %{public}d",
276             static_cast<int32_t>(len), compressedLength);
277         return Constant::FAILURE;
278     }
279 
280     int result = compress(const_cast<Byte *>(compressedBytes), &len,
281         reinterpret_cast<unsigned char *>(const_cast<char *>(json.c_str())), json.size() + 1);
282     if (result != Z_OK) {
283         LOGE(ATM_DOMAIN, ATM_TAG, "Compress failed! error code: %{public}d", result);
284         return result;
285     }
286     LOGD(ATM_DOMAIN, ATM_TAG, "Compress complete. compress %{public}d bytes to %{public}d", compressedLength,
287         static_cast<int32_t>(len));
288     compressedLength = static_cast<int32_t>(len);
289     return Constant::SUCCESS;
290 }
291 
Decompress(const unsigned char * bytes,const int length)292 std::string SoftBusChannel::Decompress(const unsigned char *bytes, const int length)
293 {
294     LOGD(ATM_DOMAIN, ATM_TAG, "Input length: %{public}d", length);
295     uLong len = RPC_TRANSFER_BYTES_MAX_LENGTH;
296     unsigned char *buf = new (std::nothrow) unsigned char[len + 1];
297     if (buf == nullptr) {
298         LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory!");
299         return "";
300     }
301     (void)memset_s(buf, len + 1, 0, len + 1);
302     int result = uncompress(buf, &len, const_cast<unsigned char *>(bytes), length);
303     if (result != Z_OK) {
304         LOGE(ATM_DOMAIN, ATM_TAG,
305             "uncompress failed, error code: %{public}d, bound length: %{public}d, buffer length: %{public}d", result,
306             static_cast<int32_t>(len), length);
307         delete[] buf;
308         return "";
309     }
310     buf[len] = '\0';
311     std::string str(reinterpret_cast<char *>(buf));
312     delete[] buf;
313     return str;
314 }
315 
SendRequestBytes(const unsigned char * bytes,const int bytesLength)316 int SoftBusChannel::SendRequestBytes(const unsigned char *bytes, const int bytesLength)
317 {
318     if (bytesLength == 0) {
319         LOGE(ATM_DOMAIN, ATM_TAG, "Bytes data is invalid.");
320         return Constant::FAILURE;
321     }
322 
323     std::unique_lock<std::mutex> lock(socketMutex_);
324     if (CheckSessionMayReopenLocked() != Constant::SUCCESS) {
325         LOGE(ATM_DOMAIN, ATM_TAG, "Socket invalid and reopen failed!");
326         return Constant::FAILURE;
327     }
328 
329     LOGD(ATM_DOMAIN, ATM_TAG, "Send len (after compress len)= %{public}d", bytesLength);
330 #ifdef DEBUG_API_PERFORMANCE
331     LOGI(ATM_DOMAIN, ATM_TAG, "Api_performance:send command to softbus");
332 #endif
333     int result = ::SendBytes(socketFd_, bytes, bytesLength);
334     if (result != Constant::SUCCESS) {
335         LOGE(ATM_DOMAIN, ATM_TAG, "Fail to send! result= %{public}d", result);
336         return Constant::FAILURE;
337     }
338     LOGD(ATM_DOMAIN, ATM_TAG, "Send successfully.");
339     return Constant::SUCCESS;
340 }
341 
CheckSessionMayReopenLocked()342 int SoftBusChannel::CheckSessionMayReopenLocked()
343 {
344     // when socket is opened, we got a valid sessionid, when socket closed, we will reset sessionid.
345     if (IsSessionAvailable()) {
346         return Constant::SUCCESS;
347     }
348     int socket = SoftBusManager::GetInstance().BindService(deviceId_);
349     if (socket != Constant::INVALID_SESSION) {
350         socketFd_ = socket;
351         return Constant::SUCCESS;
352     }
353     return Constant::FAILURE;
354 }
355 
IsSessionAvailable()356 bool SoftBusChannel::IsSessionAvailable()
357 {
358     return socketFd_ > Constant::INVALID_SESSION;
359 }
360 
CancelCloseConnectionIfNeeded()361 void SoftBusChannel::CancelCloseConnectionIfNeeded()
362 {
363     std::unique_lock<std::mutex> lock(mutex_);
364     if (!isDelayClosing_) {
365         return;
366     }
367     LOGD(ATM_DOMAIN, ATM_TAG, "Cancel close connection");
368 
369     Release();
370     isDelayClosing_ = false;
371 }
372 
HandleRequest(int socket,const std::string & id,const std::string & commandName,const std::string & jsonPayload)373 void SoftBusChannel::HandleRequest(int socket, const std::string &id, const std::string &commandName,
374     const std::string &jsonPayload)
375 {
376     std::shared_ptr<BaseRemoteCommand> command =
377         RemoteCommandFactory::GetInstance().NewRemoteCommandFromJson(commandName, jsonPayload);
378     if (command == nullptr) {
379         // send result back directly
380         LOGW(ATM_DOMAIN, ATM_TAG, "Command %{public}s cannot get from json", commandName.c_str());
381 
382         int sendlen = static_cast<int32_t>(RPC_TRANSFER_HEAD_BYTES_LENGTH + jsonPayload.length());
383         unsigned char *sendbuf = new (std::nothrow) unsigned char[sendlen + 1];
384         if (sendbuf == nullptr) {
385             LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory: %{public}d", sendlen);
386             return;
387         }
388         (void)memset_s(sendbuf, sendlen + 1, 0, sendlen + 1);
389         BytesInfo info;
390         info.bytes = sendbuf;
391         info.bytesLength = sendlen;
392         int sendResult = PrepareBytes(RESPONSE_TYPE, id, commandName, jsonPayload, info);
393         if (sendResult != Constant::SUCCESS) {
394             delete[] sendbuf;
395             return;
396         }
397         int sendResultCode = SendResponseBytes(socket, sendbuf, info.bytesLength);
398         delete[] sendbuf;
399         LOGD(ATM_DOMAIN, ATM_TAG, "Send response result= %{public}d ", sendResultCode);
400         return;
401     }
402 
403     // execute command
404     command->Execute();
405     LOGD(ATM_DOMAIN, ATM_TAG, "Command uniqueId: %{public}s, finish with status: %{public}d, message: %{public}s",
406         ConstantCommon::EncryptDevId(command->remoteProtocol_.uniqueId).c_str(), command->remoteProtocol_.statusCode,
407         command->remoteProtocol_.message.c_str());
408 
409     // send result back
410     std::string resultJsonPayload = command->ToJsonPayload();
411     int len = static_cast<int32_t>(RPC_TRANSFER_HEAD_BYTES_LENGTH + resultJsonPayload.length());
412     unsigned char *buf = new (std::nothrow) unsigned char[len + 1];
413     if (buf == nullptr) {
414         LOGE(ATM_DOMAIN, ATM_TAG, "No enough memory: %{public}d", len);
415         return;
416     }
417     (void)memset_s(buf, len + 1, 0, len + 1);
418     BytesInfo info;
419     info.bytes = buf;
420     info.bytesLength = len;
421     int result = PrepareBytes(RESPONSE_TYPE, id, commandName, resultJsonPayload, info);
422     if (result != Constant::SUCCESS) {
423         delete[] buf;
424         return;
425     }
426     int retCode = SendResponseBytes(socket, buf, info.bytesLength);
427     delete[] buf;
428     LOGD(ATM_DOMAIN, ATM_TAG, "Send response result= %{public}d", retCode);
429 }
430 
HandleResponse(const std::string & id,const std::string & jsonPayload)431 void SoftBusChannel::HandleResponse(const std::string &id, const std::string &jsonPayload)
432 {
433     std::unique_lock<std::mutex> lock(socketMutex_);
434     auto callback = callbacks_.find(id);
435     if (callback != callbacks_.end()) {
436         (callback->second)(jsonPayload);
437         callbacks_.erase(callback);
438     }
439 }
440 
SendResponseBytes(int socket,const unsigned char * bytes,const int bytesLength)441 int SoftBusChannel::SendResponseBytes(int socket, const unsigned char *bytes, const int bytesLength)
442 {
443     LOGD(ATM_DOMAIN, ATM_TAG, "Send len (after compress len)= %{public}d", bytesLength);
444     int result = ::SendBytes(socket, bytes, bytesLength);
445     if (result != Constant::SUCCESS) {
446         LOGE(ATM_DOMAIN, ATM_TAG, "Fail to send! result= %{public}d", result);
447         return Constant::FAILURE;
448     }
449     LOGD(ATM_DOMAIN, ATM_TAG, "Send successfully.");
450     return Constant::SUCCESS;
451 }
452 
FromJson(const std::string & jsonString)453 std::shared_ptr<SoftBusMessage> SoftBusMessage::FromJson(const std::string &jsonString)
454 {
455     CJsonUnique json = CreateJsonFromString(jsonString);
456     if (json == nullptr || cJSON_IsObject(json.get()) == false) {
457         LOGE(ATM_DOMAIN, ATM_TAG, "Failed to parse jsonString");
458         return nullptr;
459     }
460 
461     std::string type;
462     std::string id;
463     std::string commandName;
464     std::string jsonPayload;
465     GetStringFromJson(json.get(), "type", type);
466     GetStringFromJson(json.get(), "id", id);
467     GetStringFromJson(json.get(), "commandName", commandName);
468     GetStringFromJson(json.get(), "jsonPayload", jsonPayload);
469     std::shared_ptr<SoftBusMessage> message = std::make_shared<SoftBusMessage>(type, id, commandName, jsonPayload);
470     return message;
471 }
472 } // namespace AccessToken
473 } // namespace Security
474 } // namespace OHOS
475