1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "channel_manager.h"
17 #include "network/softbus/softbus_permission_check.h"
18 #include "securec.h"
19 #include "softbus_error_code.h"
20 #include "utils_log.h"
21
22 #include <algorithm>
23 #include <chrono>
24 #include <future>
25 #include <sys/prctl.h>
26 #include <unordered_set>
27
28 namespace OHOS {
29 namespace Storage {
30 namespace DistributedFile {
31 IMPLEMENT_SINGLE_INSTANCE(ChannelManager);
32 using namespace std;
33 using namespace FileManagement;
34 using FileManagement::ERR_OK;
35
36 namespace {
37 static inline const std::string SERVICE_NAME = "ohos.storage.distributedfile.daemon";
38 static inline const std::string SESSION_NAME = "DistributedFileService_ChannelManager";
39
40 static const uint32_t DSCHED_MAX_BUFFER_SIZE = 4 * 1024 * 1024;
41 static const int32_t MAX_WAIT_TIME_MS = 2000;
42
43 static constexpr int32_t DFS_LOW_QOS_TYPE_MIN_BW = 4 * 1024 * 1024;
44 static constexpr int32_t DFS_LOW_QOS_TYPE_MAX_LATENCY = 10000;
45 static constexpr int32_t DFS_LOW_QOS_TYPE_MIN_LATENCY = 2000;
46 static constexpr int32_t DFS_LOW_QOS_TYPE_MAX_IDLE_TIMEOUT = 60 * 60 * 1000;
47
48 static QosTV g_low_qosInfo[] = {{.qos = QOS_TYPE_MIN_BW, .value = DFS_LOW_QOS_TYPE_MIN_BW},
49 {.qos = QOS_TYPE_MAX_LATENCY, .value = DFS_LOW_QOS_TYPE_MAX_LATENCY},
50 {.qos = QOS_TYPE_MIN_LATENCY, .value = DFS_LOW_QOS_TYPE_MIN_LATENCY},
51 {.qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = DFS_LOW_QOS_TYPE_MAX_IDLE_TIMEOUT}};
52
53 static uint32_t g_lowQosTvParamIndex = static_cast<uint32_t>(sizeof(g_low_qosInfo) / sizeof(QosTV));
54
55 } // namespace
56
OnSocketConnected(int32_t socket,PeerSocketInfo info)57 static void OnSocketConnected(int32_t socket, PeerSocketInfo info)
58 {
59 ChannelManager::GetInstance().OnSocketConnected(socket, info);
60 }
61
OnSocketClosed(int32_t socket,ShutdownReason reason)62 static void OnSocketClosed(int32_t socket, ShutdownReason reason)
63 {
64 ChannelManager::GetInstance().OnSocketClosed(socket, reason);
65 }
66
OnBytesRecv(int32_t socket,const void * data,uint32_t dataLen)67 static void OnBytesRecv(int32_t socket, const void *data, uint32_t dataLen)
68 {
69 ChannelManager::GetInstance().OnBytesReceived(socket, data, dataLen);
70 }
71
OnError(int32_t socket,int32_t errCode)72 static void OnError(int32_t socket, int32_t errCode)
73 {
74 ChannelManager::GetInstance().OnSocketError(socket, errCode);
75 }
76
OnNegotiate2(int32_t socket,PeerSocketInfo info,SocketAccessInfo * peerInfo,SocketAccessInfo * localInfo)77 static bool OnNegotiate2(int32_t socket, PeerSocketInfo info, SocketAccessInfo *peerInfo, SocketAccessInfo *localInfo)
78 {
79 return ChannelManager::GetInstance().OnNegotiate2(socket, info, peerInfo, localInfo);
80 }
81
82 ISocketListener channelManagerListener = {
83 .OnBind = OnSocketConnected,
84 .OnShutdown = OnSocketClosed,
85 .OnBytes = OnBytesRecv,
86 .OnMessage = nullptr,
87 .OnStream = nullptr,
88 .OnFile = nullptr,
89 .OnQos = nullptr,
90 .OnError = OnError,
91 .OnNegotiate2 = OnNegotiate2,
92 };
93
~ChannelManager()94 ChannelManager::~ChannelManager()
95 {
96 DeInit();
97 };
98
Init()99 int32_t ChannelManager::Init()
100 {
101 LOGI("start init channel manager");
102 std::lock_guard<std::mutex> initLock(initMutex_);
103
104 if (eventHandler_ != nullptr && callbackEventHandler_ != nullptr && serverSocketId_ > 0) {
105 LOGW("server channel already init");
106 return ERR_OK;
107 }
108
109 int32_t socketServerId = CreateServerSocket();
110 if (socketServerId <= 0) {
111 LOGE("create socket failed, ret: %{public}d", socketServerId);
112 return ERR_CREATE_SOCKET_FAILED;
113 }
114
115 int32_t ret = Listen(socketServerId, g_low_qosInfo, g_lowQosTvParamIndex, &channelManagerListener);
116 if (ret != ERR_OK) {
117 LOGE("service listen failed, ret: %{public}d", ret);
118 return ERR_LISTEN_SOCKET_FAILED;
119 }
120
121 eventThread_ = std::thread(&ChannelManager::StartEvent, this);
122 std::unique_lock<std::mutex> lock(eventMutex_);
123 eventCon_.wait(lock, [this] { return eventHandler_ != nullptr; });
124
125 callbackEventThread_ = std::thread(&ChannelManager::StartCallbackEvent, this);
126 std::unique_lock<std::mutex> callbackLock(callbackEventMutex_);
127 callbackEventCon_.wait(callbackLock, [this] { return callbackEventHandler_ != nullptr; });
128 serverSocketId_ = socketServerId;
129
130 LOGI("end init channel manager");
131 return ERR_OK;
132 }
133
DeInit()134 void ChannelManager::DeInit()
135 {
136 LOGI("start deInit channel manager");
137 std::lock_guard<std::mutex> initLock(initMutex_);
138 // stop send task
139 if (eventHandler_ != nullptr) {
140 eventHandler_->GetEventRunner()->Stop();
141 if (eventThread_.joinable()) {
142 eventThread_.join();
143 }
144 eventHandler_ = nullptr;
145 }
146
147 // stop callback task
148 if (callbackEventHandler_ != nullptr) {
149 callbackEventHandler_->GetEventRunner()->Stop();
150 if (callbackEventThread_.joinable()) {
151 callbackEventThread_.join();
152 }
153 callbackEventHandler_ = nullptr;
154 }
155
156 // release socket
157 {
158 std::unique_lock<std::shared_mutex> writeLock(clientMutex_);
159 for (auto it = clientNetworkSocketMap_.begin(); it != clientNetworkSocketMap_.end(); ++it) {
160 LOGI("ShutDown socketId %{public}d from clientNetworkSocketMap_", it->second);
161 Shutdown(it->second);
162 }
163 clientNetworkSocketMap_.clear();
164 }
165
166 {
167 std::unique_lock<std::shared_mutex> writeLock(serverMutex_);
168 for (auto it = serverNetworkSocketMap_.begin(); it != serverNetworkSocketMap_.end(); ++it) {
169 LOGI("ShutDown socketId %{public}d from serverNetworkSocketMap_", it->second);
170 Shutdown(it->second);
171 }
172 serverNetworkSocketMap_.clear();
173 }
174
175 Shutdown(serverSocketId_);
176 serverSocketId_ = -1;
177 LOGI("end deInit");
178 }
179
StartEvent()180 void ChannelManager::StartEvent()
181 {
182 LOGI("StartEvent start");
183 prctl(PR_SET_NAME, ownerName_.c_str());
184 auto runner = AppExecFwk::EventRunner::Create(false);
185 {
186 std::lock_guard<std::mutex> lock(eventMutex_);
187 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
188 }
189 eventCon_.notify_one();
190 runner->Run();
191 LOGI("StartEvent end");
192 }
193
StartCallbackEvent()194 void ChannelManager::StartCallbackEvent()
195 {
196 LOGI("StartCallbackEvent start");
197 std::string callbackName = ownerName_ + "callback";
198 prctl(PR_SET_NAME, callbackName.c_str());
199 auto runner = AppExecFwk::EventRunner::Create(false);
200 {
201 std::lock_guard<std::mutex> lock(callbackEventMutex_);
202 callbackEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
203 }
204 callbackEventCon_.notify_one();
205 runner->Run();
206 LOGI("StartCallbackEvent end");
207 }
208
PostTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)209 int32_t ChannelManager::PostTask(const AppExecFwk::InnerEvent::Callback &callback,
210 const AppExecFwk::EventQueue::Priority priority)
211 {
212 if (eventHandler_ == nullptr) {
213 LOGE("event handler empty");
214 return ERR_NULL_EVENT_HANDLER;
215 }
216 if (eventHandler_->PostTask(callback, priority)) {
217 return ERR_OK;
218 }
219 LOGE("add task failed");
220 return ERR_POST_TASK_FAILED;
221 }
222
PostCallbackTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)223 int32_t ChannelManager::PostCallbackTask(const AppExecFwk::InnerEvent::Callback &callback,
224 const AppExecFwk::EventQueue::Priority priority)
225 {
226 if (callbackEventHandler_ == nullptr) {
227 LOGE("callback event handler empty");
228 return ERR_NULL_EVENT_HANDLER;
229 }
230 if (callbackEventHandler_->PostTask(callback, priority)) {
231 return ERR_OK;
232 }
233 LOGE("add callback task failed");
234 return ERR_POST_TASK_FAILED;
235 }
236
CreateClientChannel(const std::string & networkId)237 int32_t ChannelManager::CreateClientChannel(const std::string &networkId)
238 {
239 LOGI("CreateClientChannel start, networkId: %{public}.6s", networkId.c_str());
240 if (!SoftBusPermissionCheck::CheckSrcPermission(networkId)) {
241 LOGE("CheckSrcPermission failed");
242 return ERR_CHECK_PERMISSION_FAILED;
243 }
244
245 if (!SoftBusPermissionCheck::IsSameAccount(networkId)) {
246 LOGE("The calling device is not trusted");
247 return ERR_CHECK_PERMISSION_FAILED;
248 }
249
250 {
251 std::shared_lock<std::shared_mutex> readLock(clientMutex_);
252 auto channelIt = clientNetworkSocketMap_.find(networkId);
253 if (channelIt != clientNetworkSocketMap_.end()) {
254 LOGW("has connect to this network");
255 return ERR_OK;
256 }
257 }
258
259 int32_t socketId = CreateClientSocket(networkId);
260 if (socketId <= 0) {
261 LOGE("create client socket failed");
262 return ERR_BAD_VALUE;
263 }
264 if (!SoftBusPermissionCheck::SetAccessInfoToSocket(socketId)) {
265 LOGE("Fill and set accessInfo failed");
266 return ERR_BAD_VALUE;
267 }
268
269 LOGI("start to bind socket, id:%{public}d", socketId);
270 int32_t ret = Bind(socketId, g_low_qosInfo, g_lowQosTvParamIndex, &channelManagerListener);
271 if (ret != ERR_OK) {
272 LOGE("client bind failed, ret: %{public}d", ret);
273 Shutdown(socketId);
274 return ERR_BIND_SOCKET_FAILED;
275 }
276
277 std::lock_guard<std::shared_mutex> writeLock(clientMutex_);
278 clientNetworkSocketMap_[networkId] = socketId;
279
280 LOGI("CreateClientChannel end");
281 return ERR_OK;
282 }
283
DestroyClientChannel(const std::string & networkId)284 int32_t ChannelManager::DestroyClientChannel(const std::string &networkId)
285 {
286 LOGI("DestroyClientChannel start, networkId: %{public}.6s", networkId.c_str());
287 int32_t socketId = -1;
288 {
289 std::unique_lock<std::shared_mutex> writeLock(clientMutex_);
290 auto channelIt = clientNetworkSocketMap_.find(networkId);
291 if (channelIt == clientNetworkSocketMap_.end()) {
292 LOGE("has not connect to this network");
293 return ERR_BAD_VALUE;
294 } else {
295 socketId = channelIt->second;
296 clientNetworkSocketMap_.erase(channelIt);
297 }
298 }
299
300 if (socketId <= 0) {
301 LOGE("socket id invalid");
302 return ERR_BAD_VALUE;
303 }
304
305 Shutdown(socketId);
306 return ERR_OK;
307 }
308
HasExistChannel(const std::string & networkId)309 bool ChannelManager::HasExistChannel(const std::string &networkId)
310 {
311 std::shared_lock<std::shared_mutex> readLock(clientMutex_);
312 return clientNetworkSocketMap_.count(networkId) > 0;
313 }
314
CreateServerSocket()315 int32_t ChannelManager::CreateServerSocket()
316 {
317 LOGI("start create server socket");
318 SocketInfo info = {
319 .name = const_cast<char *>(SESSION_NAME.c_str()),
320 .pkgName = const_cast<char *>(SERVICE_NAME.c_str()),
321 };
322
323 int32_t socketId = Socket(info);
324 LOGI("finish, socket id: %{public}d", socketId);
325 return socketId;
326 }
327
CreateClientSocket(const std::string & peerNetworkId)328 int32_t ChannelManager::CreateClientSocket(const std::string &peerNetworkId)
329 {
330 LOGI("CreateClientSocket start");
331 SocketInfo socketInfo = {.name = const_cast<char *>(SESSION_NAME.c_str()),
332 .peerName = const_cast<char *>(SESSION_NAME.c_str()),
333 .peerNetworkId = const_cast<char *>(peerNetworkId.c_str()),
334 .pkgName = const_cast<char *>(SERVICE_NAME.c_str()),
335 .dataType = DATA_TYPE_BYTES};
336
337 int32_t socketId = Socket(socketInfo);
338 LOGI("finish, socket session id: %{public}d", socketId);
339 return socketId;
340 }
341
SendBytes(const std::string & networkId,const std::string & data)342 int32_t ChannelManager::SendBytes(const std::string &networkId, const std::string &data)
343 {
344 LOGI("start send bytes. networkId: %{public}.6s, data size: %{public}zu", networkId.c_str(), data.length());
345 int32_t targetSocketId = -1;
346 {
347 std::shared_lock<std::shared_mutex> readLock(clientMutex_);
348 auto channelIt = clientNetworkSocketMap_.find(networkId);
349 if (channelIt == clientNetworkSocketMap_.end()) {
350 LOGE("has not connect to this network");
351 return ERR_BAD_VALUE;
352 }
353 targetSocketId = channelIt->second;
354 }
355
356 int32_t ret = DoSendBytes(targetSocketId, data);
357 LOGI("finish, send bytes ret: %{public}d", ret);
358 return ret;
359 }
360
DoSendBytes(const std::int32_t socketId,const std::string & data)361 int32_t ChannelManager::DoSendBytes(const std::int32_t socketId, const std::string &data)
362 {
363 LOGI("DoSendBytes start, dataLen: %{public}zu", data.length());
364 int32_t ret = ::SendBytes(socketId, data.c_str(), data.length());
365 if (ret != SOFTBUS_OK) {
366 LOGE("Send data buffer failed. ret: %{public}d", ret);
367 return ERR_SEND_DATA_BY_SOFTBUS_FAILED;
368 }
369 LOGI("DoSendBytes end");
370 return ERR_OK;
371 }
372
OnSocketConnected(int32_t socketId,const PeerSocketInfo & info)373 void ChannelManager::OnSocketConnected(int32_t socketId, const PeerSocketInfo &info)
374 {
375 LOGI("socket %{public}d bind now", socketId);
376 if (socketId <= 0) {
377 LOGE("invalid socket id, %{public}d", socketId);
378 return;
379 }
380 std::string clientNetworkId = info.networkId;
381
382 std::lock_guard<std::shared_mutex> writeLock(serverMutex_);
383 serverNetworkSocketMap_[clientNetworkId] = socketId;
384
385 LOGI("add update channel task into handler");
386 }
387
OnSocketError(int32_t socketId,const int32_t errorCode)388 void ChannelManager::OnSocketError(int32_t socketId, const int32_t errorCode)
389 {
390 LOGE("socket error. socketId: %{public}d, errorCode: %{public}d", socketId, errorCode);
391 }
392
OnSocketClosed(int32_t socketId,const ShutdownReason & reason)393 void ChannelManager::OnSocketClosed(int32_t socketId, const ShutdownReason &reason)
394 {
395 LOGI("socket closed. socketId: %{public}d, reason: %{public}d", socketId, reason);
396 if (socketId <= 0) {
397 LOGE("invalid socket id, %{public}d", socketId);
398 return;
399 }
400
401 {
402 std::unique_lock<std::shared_mutex> writeLock(clientMutex_);
403 for (auto it = clientNetworkSocketMap_.begin(); it != clientNetworkSocketMap_.end();) {
404 if (it->second == socketId) {
405 LOGI("Removed socketId %{public}d from clientNetworkSocketMap_", socketId);
406 it = clientNetworkSocketMap_.erase(it);
407 } else {
408 ++it;
409 }
410 }
411 }
412
413 {
414 std::unique_lock<std::shared_mutex> writeLock(serverMutex_);
415 for (auto it = serverNetworkSocketMap_.begin(); it != serverNetworkSocketMap_.end();) {
416 if (it->second == socketId) {
417 LOGI("Removed socketId %{public}d from serverNetworkSocketMap_", socketId);
418 it = serverNetworkSocketMap_.erase(it);
419 } else {
420 ++it;
421 }
422 }
423 }
424 }
425
OnNegotiate2(int32_t socket,PeerSocketInfo info,SocketAccessInfo * peerInfo,SocketAccessInfo * localInfo)426 bool ChannelManager::OnNegotiate2(int32_t socket,
427 PeerSocketInfo info,
428 SocketAccessInfo *peerInfo,
429 SocketAccessInfo *localInfo)
430 {
431 AccountInfo callerAccountInfo;
432 std::string networkId = info.networkId;
433 if (!SoftBusPermissionCheck::TransCallerInfo(peerInfo, callerAccountInfo, networkId)) {
434 LOGE("peerInfo is nullptr.");
435 return false;
436 }
437 if (!SoftBusPermissionCheck::FillLocalInfo(localInfo)) {
438 LOGE("FillLocalInfo failed.");
439 return false;
440 }
441 if (!SoftBusPermissionCheck::IsSameAccount(networkId)) {
442 LOGE("The calling device is not trusted");
443 return false;
444 }
445 return SoftBusPermissionCheck::CheckSinkPermission(callerAccountInfo);
446 }
447
HandleRemoteBytes(const std::string & jsonStr,int32_t socketId)448 void ChannelManager::HandleRemoteBytes(const std::string &jsonStr, int32_t socketId)
449 {
450 ControlCmd inCmd;
451 if (!ControlCmdParser::ParseFromJson(jsonStr, inCmd)) {
452 LOGE("Invalid json format");
453 return;
454 }
455
456 if (inCmd.msgType == ControlCmdType::CMD_MSG_RESPONSE) {
457 LOGI("remote bytes type is response.");
458 std::unique_lock<std::mutex> lock(mtx_);
459 int32_t msgId = inCmd.msgId;
460
461 auto it = pendingResponses_.find(msgId);
462 if (it != pendingResponses_.end()) {
463 std::unique_lock<std::mutex> waiterLock(it->second->mutex);
464 it->second->response = inCmd;
465 it->second->received = true;
466 it->second->cv.notify_one();
467 return;
468 }
469 LOGE("msgId not found in pendingResponses_ %{public}d", msgId);
470 return;
471 }
472
473 ControlCmd outCmd;
474 if (!ControlCmdParser::HandleRequest(inCmd, outCmd)) {
475 LOGE("HandleRequest failed, msgType: %{public}d", inCmd.msgType);
476 return;
477 }
478
479 std::string outJsonStr;
480 if (outCmd.msgType != ControlCmdType::CMD_UNKNOWN && ControlCmdParser::SerializeToJson(outCmd, outJsonStr)) {
481 LOGI("Send response length: %{public}zu", outJsonStr.length());
482 DoSendBytes(socketId, outJsonStr);
483 return;
484 }
485 }
486
DoSendBytesAsync(const ControlCmd & request,const std::string & networkId)487 void ChannelManager::DoSendBytesAsync(const ControlCmd &request, const std::string &networkId)
488 {
489 std::string data;
490 if (!ControlCmdParser::SerializeToJson(request, data)) {
491 LOGE("SerializeToJson failed, requestId is %{public}d", request.msgId);
492 return;
493 }
494 SendBytes(networkId, data);
495 return;
496 }
497
OnBytesReceived(int32_t socketId,const void * data,const uint32_t dataLen)498 void ChannelManager::OnBytesReceived(int32_t socketId, const void *data, const uint32_t dataLen)
499 {
500 LOGI("socket %{public}d receive data, len=%{public}d", socketId, dataLen);
501 if (socketId <= 0) {
502 LOGE("invalid socket id, %{public}d", socketId);
503 return;
504 }
505
506 if (data == nullptr || dataLen == 0 || dataLen > DSCHED_MAX_BUFFER_SIZE) {
507 LOGE("Invalid data, data len = %{public}u", dataLen);
508 return;
509 }
510
511 const char *charData = static_cast<const char *>(data);
512 std::string jsonStr(charData, dataLen);
513
514 auto func = [this, jsonStr, socketId]() { HandleRemoteBytes(jsonStr, socketId); };
515 PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
516 }
517
SendRequest(const std::string & networkId,ControlCmd & request,ControlCmd & response,bool needResponse)518 int32_t ChannelManager::SendRequest(const std::string &networkId,
519 ControlCmd &request,
520 ControlCmd &response,
521 bool needResponse)
522 {
523 LOGI("start sendRequest, networkId: %{public}.6s", networkId.c_str());
524 {
525 std::shared_lock<std::shared_mutex> readLock(clientMutex_);
526 if (clientNetworkSocketMap_.count(networkId) == 0) {
527 LOGE("networkId not found");
528 return ERR_NO_EXIST_CHANNEL;
529 }
530 }
531 int32_t msgId = request.msgId;
532 std::shared_ptr<ResponseWaiter> waiter;
533 if (needResponse) {
534 waiter = std::make_shared<ResponseWaiter>();
535 std::unique_lock<std::mutex> lock(mtx_);
536 pendingResponses_.emplace(msgId, waiter);
537 }
538
539 // Serialize and send the request (async operation)
540 auto sendFunc = [this, request, networkId]() { DoSendBytesAsync(request, networkId); };
541
542 auto ret = PostTask(sendFunc, AppExecFwk::EventQueue::Priority::IMMEDIATE);
543 if (ret != E_OK) {
544 LOGE("failed to add send bytes task, ret=%{public}d", ret);
545 if (waiter) {
546 std::unique_lock<std::mutex> lock(mtx_);
547 pendingResponses_.erase(msgId);
548 }
549 return ret;
550 }
551 if (needResponse && waiter) {
552 std::unique_lock<std::mutex> waiterLock(waiter->mutex);
553 bool received = waiter->cv.wait_for(waiterLock, std::chrono::milliseconds(MAX_WAIT_TIME_MS),
554 [waiter] { return waiter->received; });
555 if (received) {
556 response = waiter->response;
557 std::string responseStr;
558 ControlCmdParser::SerializeToJson(response, responseStr);
559 LOGI("response length is %{public}zu", responseStr.length());
560 } else {
561 LOGE("Timeout waiting for response");
562 ret = E_TIMEOUT;
563 }
564 std::lock_guard<std::mutex> lock(mtx_);
565 pendingResponses_.erase(msgId);
566 }
567 return ret;
568 }
569
570 } // namespace DistributedFile
571 } // namespace Storage
572 } // namespace OHOS