1 /*
2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "channel_manager.h"
17
18 #include <algorithm>
19 #include <chrono>
20 #include <dlfcn.h>
21 #include <future>
22 #include <sys/prctl.h>
23 #include <unistd.h>
24 #include <unordered_set>
25
26 #include "dtbcollabmgr_log.h"
27 #include "securec.h"
28 #include "softbus_error_code.h"
29
30 namespace OHOS {
31 namespace DistributedCollab {
32 IMPLEMENT_SINGLE_INSTANCE(ChannelManager);
33 namespace {
34 static const std::string TAG = "DSchedCollabChannelManager";
35 constexpr int32_t BIND_RETRY_INTERVAL = 500;
36 constexpr int32_t MAX_BIND_RETRY_TIME = 1500;
37 constexpr int32_t MAX_RETRY_TIMES = 3;
38 constexpr int32_t MS_TO_US = 1000;
39 enum class QosSpeedType {
40 HIGH,
41 LOW
42 };
43
44 static std::map<ChannelDataType, TransDataType> CHANNEL_SOFTBUS_DATATYPE_MAP = {
45 { ChannelDataType::MESSAGE, DATA_TYPE_MESSAGE },
46 { ChannelDataType::BYTES, DATA_TYPE_BYTES },
47 { ChannelDataType::VIDEO_STREAM, DATA_TYPE_VIDEO_STREAM },
48 { ChannelDataType::FILE, DATA_TYPE_FILE }
49 };
50
51 static std::map<ChannelDataType, QosSpeedType> CHANNEL_DATATYPE_SPEED_MAP = {
52 { ChannelDataType::MESSAGE, QosSpeedType::LOW },
53 { ChannelDataType::BYTES, QosSpeedType::LOW },
54 { ChannelDataType::VIDEO_STREAM, QosSpeedType::HIGH },
55 { ChannelDataType::FILE, QosSpeedType::HIGH }
56 };
57
58 static const std::string SPLIT_FLAG = "_";
59 static const std::string COLLAB_PGK_NAME = "dms";
60 static const std::string SESSION_NAME_PREFIX = "ohos.dtbcollab.dms";
61
62 static std::map<ChannelDataType, std::string> CHANNEL_DATATYPE_PREFIX_MAP = {
63 { ChannelDataType::MESSAGE, "M" },
64 { ChannelDataType::BYTES, "B" },
65 { ChannelDataType::VIDEO_STREAM, "V" },
66 { ChannelDataType::FILE, "F" }
67 };
68
69 static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW = 4 * 1024 * 1024;
70 static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY = 10000;
71 static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY = 2000;
72
73 static QosTV g_low_qosInfo[] = {
74 { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW },
75 { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY },
76 { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY },
77 { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
78 };
79
80 static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW = 80 * 1024 * 1024;
81 static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY = 10000;
82 static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY = 2000;
83
84 static QosTV g_high_qosInfo[] = {
85 { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW },
86 { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY },
87 { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY },
88 { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
89 };
90
91 static std::map<QosSpeedType, QosTV*> qos_config = {
92 { QosSpeedType::HIGH, g_high_qosInfo },
93 { QosSpeedType::LOW, g_low_qosInfo }
94 };
95
96 static uint32_t g_lowQosTvParamIndex = static_cast<uint32_t>(sizeof(g_low_qosInfo) / sizeof(QosTV));
97 static uint32_t g_highQosTvParamIndex = static_cast<uint32_t>(sizeof(g_high_qosInfo) / sizeof(QosTV));
98 static std::map<QosSpeedType, uint32_t> qos_speed_config = {
99 { QosSpeedType::HIGH, g_highQosTvParamIndex },
100 { QosSpeedType::LOW, g_lowQosTvParamIndex }
101 };
102
103 #define CHECK_SOCKET_ID(socketId) \
104 do { \
105 if ((socketId) <= 0) { \
106 HILOGE("invalid socket id, %{public}d", socketId); \
107 return; \
108 } \
109 } while (0)
110
111 #define CHECK_CHANNEL_ID(socketId, channelId) \
112 do { \
113 (channelId) = GetChannelId(socketId); \
114 if (!isValidChannelId(channelId)) { \
115 HILOGE("invalid socket id %{public}d, can't find channel", socketId); \
116 return; \
117 } \
118 } while (0)
119
120 #define CHECK_DATA_NULL(socketId, data, errorHandler) \
121 do { \
122 if ((data) == nullptr) { \
123 HILOGE("receive empty bytes data, socketId=%{public}d", socketId); \
124 (errorHandler)(channelId, RECV_DATA_EMPTY); \
125 return; \
126 } \
127 } while (0)
128 }
129
130 namespace {
131 constexpr int32_t MAX_LEN = 10 * 1024;
132 }
133
OnSocketConnected(int32_t socket,PeerSocketInfo info)134 static void OnSocketConnected(int32_t socket, PeerSocketInfo info)
135 {
136 ChannelManager::GetInstance().OnSocketConnected(socket, info);
137 }
138
OnSocketClosed(int32_t socket,ShutdownReason reason)139 static void OnSocketClosed(int32_t socket, ShutdownReason reason)
140 {
141 ChannelManager::GetInstance().OnSocketClosed(socket, reason);
142 }
143
OnBytesRecv(int32_t socket,const void * data,uint32_t dataLen)144 static void OnBytesRecv(int32_t socket, const void* data, uint32_t dataLen)
145 {
146 ChannelManager::GetInstance().OnBytesReceived(socket, data, dataLen);
147 }
148
OnMessageRecv(int32_t socket,const void * data,uint32_t dataLen)149 static void OnMessageRecv(int32_t socket, const void* data, uint32_t dataLen)
150 {
151 ChannelManager::GetInstance().OnMessageReceived(socket, data, dataLen);
152 }
153
OnStreamRecv(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)154 static void OnStreamRecv(int32_t socket, const StreamData* data, const StreamData* ext,
155 const StreamFrameInfo* param)
156 {
157 ChannelManager::GetInstance().OnStreamReceived(socket, data, ext, param);
158 }
159
OnError(int32_t socket,int32_t errCode)160 static void OnError(int32_t socket, int32_t errCode)
161 {
162 ChannelManager::GetInstance().OnSocketError(socket, errCode);
163 }
164
OnFileEvent(int32_t socket,FileEvent * event)165 static void OnFileEvent(int32_t socket, FileEvent *event)
166 {
167 ChannelManager::GetInstance().OnFileEventReceived(socket, event);
168 }
169
GetRecvPath()170 static const char* GetRecvPath()
171 {
172 return ChannelManager::GetInstance().GetRecvPathFromUser();
173 }
174
175 ISocketListener channelManagerListener = {
176 .OnBind = OnSocketConnected,
177 .OnShutdown = OnSocketClosed,
178 .OnBytes = OnBytesRecv,
179 .OnMessage = OnMessageRecv,
180 .OnStream = OnStreamRecv,
181 .OnFile = OnFileEvent,
182 .OnError = OnError,
183 };
184
~ChannelManager()185 ChannelManager::~ChannelManager()
186 {
187 DeInit();
188 };
189
Init(const std::string & ownerName)190 int32_t ChannelManager::Init(const std::string& ownerName)
191 {
192 HILOGI("start init channel manager");
193 if (eventHandler_ != nullptr && callbackEventHandler_ != nullptr && msgEventHandler_ != nullptr &&
194 callbackEventHandlerNew_ != nullptr) {
195 HILOGW("server channel already init");
196 return ERR_OK;
197 }
198 if (serverSocketId_ > 0) {
199 HILOGW("server socket already init");
200 return ERR_OK;
201 }
202 ownerName_ = ownerName;
203
204 eventThread_ = std::thread(&ChannelManager::StartEvent, this);
205 std::unique_lock<std::mutex> lock(eventMutex_);
206 eventCon_.wait(lock, [this] {
207 return eventHandler_ != nullptr;
208 });
209
210 callbackEventThread_ = std::thread(&ChannelManager::StartCallbackEvent, this);
211 std::unique_lock<std::mutex> callbackLock(callbackEventMutex_);
212 callbackEventCon_.wait(callbackLock, [this] {
213 return callbackEventHandler_ != nullptr;
214 });
215
216 msgEventThread_ = std::thread(&ChannelManager::StartMsgEvent, this);
217 std::unique_lock<std::mutex> msgLock(msgEventMutex_);
218 msgEventCon_.wait(msgLock, [this] {
219 return msgEventHandler_ != nullptr;
220 });
221
222 callbackEventNewThread_ = std::thread(&ChannelManager::StartCallbackEventNew, this);
223 std::unique_lock<std::mutex> callbackNewLock(callbackEventNewMutex_);
224 callbackEventNewCon_.wait(callbackNewLock, [this] {
225 return callbackEventHandlerNew_ != nullptr;
226 });
227
228 int32_t socketServerId = CreateServerSocket();
229 if (socketServerId <= 0) {
230 HILOGE("create socket failed, ret: %{public}d", socketServerId);
231 return CREATE_SOCKET_FAILED;
232 }
233 int32_t ret = Listen(socketServerId, g_low_qosInfo, g_lowQosTvParamIndex, &channelManagerListener);
234 if (ret != ERR_OK) {
235 HILOGE("service listen failed, ret: %{public}d", ret);
236 return LISTEN_SOCKET_FAILED;
237 }
238 serverSocketId_ = socketServerId;
239 int32_t result = GetDmsInteractiveAdapterProxy();
240 if (result != ERR_OK) {
241 HILOGE("Get remote dms interactive adapter proxy fail, ret %{public}d.", ret);
242 }
243 HILOGI("end");
244 return ERR_OK;
245 }
246
StartEvent()247 void ChannelManager::StartEvent()
248 {
249 HILOGI("StartEvent start");
250 prctl(PR_SET_NAME, ownerName_.c_str());
251 auto runner = AppExecFwk::EventRunner::Create(false);
252 {
253 std::lock_guard<std::mutex> lock(eventMutex_);
254 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
255 }
256 eventCon_.notify_one();
257 runner->Run();
258 HILOGI("StartEvent end");
259 }
260
StartCallbackEvent()261 void ChannelManager::StartCallbackEvent()
262 {
263 HILOGI("Start callback event start");
264 std::string callbackName = ownerName_ + "callback";
265 prctl(PR_SET_NAME, callbackName.c_str());
266 auto runner = AppExecFwk::EventRunner::Create(false);
267 {
268 std::lock_guard<std::mutex> lock(callbackEventMutex_);
269 callbackEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
270 }
271 callbackEventCon_.notify_one();
272 runner->Run();
273 HILOGI("callback event end");
274 }
275
StartCallbackEventNew()276 void ChannelManager::StartCallbackEventNew()
277 {
278 HILOGI("Start new callback event start");
279 std::string callbackName = ownerName_ + "callbackNew";
280 prctl(PR_SET_NAME, callbackName.c_str());
281 auto runner = AppExecFwk::EventRunner::Create(false);
282 {
283 std::lock_guard<std::mutex> lock(callbackEventNewMutex_);
284 callbackEventHandlerNew_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
285 }
286 callbackEventNewCon_.notify_one();
287 runner->Run();
288 HILOGI("new callback event end");
289 }
290
StartMsgEvent()291 void ChannelManager::StartMsgEvent()
292 {
293 HILOGI("Start msg event start");
294 std::string msgName = ownerName_ + "msg";
295 prctl(PR_SET_NAME, msgName.c_str());
296 auto runner = AppExecFwk::EventRunner::Create(false);
297 {
298 std::lock_guard<std::mutex> lock(msgEventMutex_);
299 msgEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
300 }
301 msgEventCon_.notify_one();
302 runner->Run();
303 HILOGI("msg event end");
304 }
305
PostTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority,const std::string & name)306 int32_t ChannelManager::PostTask(const AppExecFwk::InnerEvent::Callback& callback,
307 const AppExecFwk::EventQueue::Priority priority, const std::string& name)
308 {
309 if (eventHandler_ == nullptr) {
310 HILOGE("event handler empty");
311 return NULL_EVENT_HANDLER;
312 }
313 if (eventHandler_->PostTask(callback, name, 0, priority)) {
314 return ERR_OK;
315 }
316 HILOGE("add task failed");
317 return POST_TASK_FAILED;
318 }
319
PostCallbackTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)320 int32_t ChannelManager::PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback,
321 const AppExecFwk::EventQueue::Priority priority)
322 {
323 if (callbackEventHandler_ == nullptr) {
324 HILOGE("callback event handler empty");
325 return NULL_EVENT_HANDLER;
326 }
327 if (callbackEventHandler_->PostTask(callback, priority)) {
328 return ERR_OK;
329 }
330 HILOGE("add callback task failed");
331 return POST_TASK_FAILED;
332 }
333
PostCallbackTaskNew(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)334 int32_t ChannelManager::PostCallbackTaskNew(const AppExecFwk::InnerEvent::Callback& callback,
335 const AppExecFwk::EventQueue::Priority priority)
336 {
337 if (callbackEventHandlerNew_ == nullptr) {
338 HILOGE("new callback event handler empty");
339 return NULL_EVENT_HANDLER;
340 }
341 if (callbackEventHandlerNew_->PostTask(callback, priority)) {
342 return ERR_OK;
343 }
344 HILOGE("add new callback task failed");
345 return POST_TASK_FAILED;
346 }
347
PostMsgTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)348 int32_t ChannelManager::PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback,
349 const AppExecFwk::EventQueue::Priority priority)
350 {
351 if (msgEventHandler_ == nullptr) {
352 HILOGE("msg event handler empty");
353 return NULL_EVENT_HANDLER;
354 }
355 if (msgEventHandler_->PostTask(callback, priority)) {
356 return ERR_OK;
357 }
358 HILOGE("add msg task failed");
359 return POST_TASK_FAILED;
360 }
361
CreateServerSocket()362 int32_t ChannelManager::CreateServerSocket()
363 {
364 HILOGI("start create server socket");
365 std::string sessionName = SESSION_NAME_PREFIX + ownerName_;
366 HILOGI("sessionName: %{public}s, size: %{public}zu", sessionName.c_str(),
367 sessionName.length());
368 SocketInfo info = {
369 .name = const_cast<char*>(sessionName.c_str()),
370 .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
371 };
372 int32_t socket = Socket(info);
373 HILOGI("finish, socket id: %{public}d", socket);
374 return socket;
375 }
376
DeInit()377 void ChannelManager::DeInit()
378 {
379 HILOGI("start deinit channel manager");
380 // stop all task
381 if (eventHandler_ != nullptr) {
382 eventHandler_->GetEventRunner()->Stop();
383 if (eventThread_.joinable()) {
384 eventThread_.join();
385 }
386 eventHandler_ = nullptr;
387 } else {
388 HILOGE("eventHandler_ is nullptr");
389 }
390
391 // stop callback task
392 if (callbackEventHandler_ != nullptr) {
393 callbackEventHandler_->GetEventRunner()->Stop();
394 if (callbackEventThread_.joinable()) {
395 callbackEventThread_.join();
396 }
397 callbackEventHandler_ = nullptr;
398 } else {
399 HILOGE("callbackEventHandler_ is nullptr");
400 }
401
402 // stop msg task
403 if (msgEventHandler_ != nullptr) {
404 msgEventHandler_->GetEventRunner()->Stop();
405 if (msgEventThread_.joinable()) {
406 msgEventThread_.join();
407 }
408 msgEventHandler_ = nullptr;
409 }
410
411 // stop new callback task
412 if (callbackEventHandlerNew_ != nullptr) {
413 callbackEventHandlerNew_->GetEventRunner()->Stop();
414 if (callbackEventNewThread_.joinable()) {
415 callbackEventNewThread_.join();
416 }
417 callbackEventHandlerNew_ = nullptr;
418 }
419
420 // release channels
421 std::unordered_set<int32_t> channelIds;
422 for (const auto& entry : channelIdMap_) {
423 for (int32_t id : entry.second) {
424 channelIds.insert(id);
425 }
426 }
427 for (const int32_t id : channelIds) {
428 DeleteChannel(id);
429 }
430 dlclose(dllHandle_);
431 dllHandle_ = nullptr;
432 dmsFileAdapetr_.SetFileSchema = nullptr;
433 Shutdown(serverSocketId_);
434 Reset();
435 HILOGI("end");
436 }
437
GetDmsInteractiveAdapterProxy()438 int32_t ChannelManager::GetDmsInteractiveAdapterProxy()
439 {
440 HILOGI("Get remote dms interactive adapter proxy.");
441 std::lock_guard<std::mutex> autoLock(dmsAdapetrLock_);
442 #if (defined(__aarch64__) || defined(__x86_64__))
443 char resolvedPath[100] = "/system/lib64/libdms_interactive_adapter.z.so";
444 #else
445 char resolvedPath[100] = "/system/lib/libdms_interactive_adapter.z.so";
446 #endif
447 int32_t (*GetSoftbusFile)(ISoftbusFileAdpater &dmsFileHandle) = nullptr;
448
449 dllHandle_ = dlopen(resolvedPath, RTLD_LAZY);
450 if (dllHandle_ == nullptr) {
451 HILOGE("Open dms interactive adapter shared object fail, resolvedPath [%{public}s].", resolvedPath);
452 return NOT_FIND_SERVICE_REGISTRY;
453 }
454
455 int32_t ret = ERR_OK;
456 do {
457 GetSoftbusFile = reinterpret_cast<int32_t (*)(ISoftbusFileAdpater &dmsFileHandle)>(
458 dlsym(dllHandle_, "GetSoftbusFile"));
459 if (GetSoftbusFile == nullptr) {
460 HILOGE("Link the GetDmsInteractiveAdapter symbol in dms interactive adapter fail.");
461 ret = NOT_FIND_SERVICE_REGISTRY;
462 break;
463 }
464
465 if (GetSoftbusFile(dmsFileAdapetr_)) {
466 HILOGE("Init remote dms interactive adapter proxy fail, ret %{public}d.", ret);
467 ret = INVALID_PARAMETERS_ERR;
468 break;
469 }
470 ret = ERR_OK;
471 } while (false);
472
473 if (ret != ERR_OK) {
474 HILOGE("Get remote dms interactive adapter proxy fail, dlclose handle.");
475 dlclose(dllHandle_);
476 dllHandle_ = nullptr;
477 }
478 return ret;
479 }
480
Reset()481 void ChannelManager::Reset()
482 {
483 HILOGI("reset channel manager");
484 serverSocketId_ = -1;
485 ownerName_ = "";
486 nextIds_ = {
487 { ChannelDataType::MESSAGE, MESSAGE_START_ID },
488 { ChannelDataType::BYTES, BYTES_START_ID },
489 { ChannelDataType::VIDEO_STREAM, STREAM_START_ID },
490 { ChannelDataType::FILE, FILE_START_ID }
491 };
492 }
493
GetVersion()494 int32_t ChannelManager::GetVersion()
495 {
496 return VERSION_;
497 }
498
CreateServerChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)499 int32_t ChannelManager::CreateServerChannel(const std::string& channelName,
500 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
501 {
502 HILOGI("start to creat server channel waiting for connect");
503 std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
504 if (!info) {
505 HILOGE("Create server channel failed");
506 return CREATE_SERVER_CHANNEL_FAILED;
507 }
508 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
509 channelIdMap_[channelName].push_back(info->channelId);
510 channelInfoMap_.emplace(info->channelId, std::move(*info));
511 // save file channel
512 fileChannelId_.store(info->channelId);
513 HILOGI("end");
514 return info->channelId;
515 }
516
CreateClientChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)517 int32_t ChannelManager::CreateClientChannel(const std::string& channelName,
518 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
519 {
520 HILOGI("start to creat client channel to connect other");
521 std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
522 if (!info) {
523 HILOGE("Create client channel failed");
524 return CREATE_CLIENT_CHANNEL_FAILED;
525 }
526 int32_t ret = RegisterSocket(*info, dataType);
527 HILOGI("end");
528 return ret == ERR_OK ? info->channelId : ret;
529 };
530
CreateBaseChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)531 std::optional<ChannelInfo> ChannelManager::CreateBaseChannel(const std::string& channelName,
532 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
533 {
534 HILOGI("start create base channel, dataType=%{public}d, name=%{public}s",
535 static_cast<int32_t>(dataType), channelName.c_str());
536 int32_t channelId = GenerateNextId(dataType);
537 if (!isValidChannelId(channelId)) {
538 HILOGE("Get channel id failed, id=%{public}d", channelId);
539 return std::nullopt;
540 }
541 ChannelInfo info;
542 info.channelId = channelId;
543 info.channelName = channelName;
544 info.status = ChannelStatus::UNCONNECTED;
545 info.dataType = dataType;
546 info.peerInfo = peerInfo;
547 return info;
548 }
549
GenerateNextId(const ChannelDataType dataType)550 int32_t ChannelManager::GenerateNextId(const ChannelDataType dataType)
551 {
552 int32_t channelId = 0;
553 // lock for each type
554 std::lock_guard<std::mutex> typeLock(typeMutex_[dataType]);
555 HILOGI("create socket for %{public}d", static_cast<int32_t>(dataType));
556 channelId = nextIds_[dataType];
557 if (channelId - CHANNEL_ID_GAP * (static_cast<int32_t>(dataType) + 1)
558 >= CHANNEL_ID_GAP) {
559 HILOGE("type %{public}d exceed max channel",
560 static_cast<int32_t>(dataType));
561 return CHANNEL_NUM_EXCEED_LIMIT;
562 }
563 nextIds_[dataType]++;
564 return channelId;
565 }
566
RegisterSocket(ChannelInfo & info,const ChannelDataType dataType)567 int32_t ChannelManager::RegisterSocket(ChannelInfo& info, const ChannelDataType dataType)
568 {
569 int32_t clientSocketId = CreateClientSocket(info.channelName,
570 info.peerInfo.peerName, info.peerInfo.networkId, dataType);
571 if (clientSocketId <= 0) {
572 HILOGE("create socket failed, ret: %{public}d", clientSocketId);
573 return CREATE_SOCKET_FAILED;
574 }
575 // save info to each map
576 {
577 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
578 socketChannelMap_.emplace(clientSocketId, info.channelId);
579 socketStatusMap_.emplace(clientSocketId, ChannelStatus::UNCONNECTED);
580 }
581 HILOGI("register channel name: %{public}s", info.channelName.c_str());
582 {
583 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
584 info.clientSockets.push_back(clientSocketId);
585 info.dataSenderReceivers[clientSocketId] = std::make_unique<DataSenderReceiver>(clientSocketId);
586 channelIdMap_[info.channelName].push_back(info.channelId);
587 channelInfoMap_.emplace(info.channelId, std::move(info));
588 }
589 return ERR_OK;
590 }
591
CreateClientSocket(const std::string & channelName,const std::string & peerName,const std::string & peerNetworkId,const ChannelDataType dataType)592 int32_t ChannelManager::CreateClientSocket(const std::string& channelName,
593 const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType)
594 {
595 HILOGI("start");
596 if (channelName.length() > MAX_CHANNEL_NAME_LENGTH) {
597 HILOGE("channel name too long, %{public}s", channelName.c_str());
598 return -INVALID_CHANNEL_NAME;
599 }
600 // ohos.dtbcollab.dms64_F_64
601 std::string name = SESSION_NAME_PREFIX + ownerName_ +
602 SPLIT_FLAG + CHANNEL_DATATYPE_PREFIX_MAP[dataType] + SPLIT_FLAG + channelName;
603 std::string peerSocketName = SESSION_NAME_PREFIX + peerName;
604 HILOGI("self-name: %{public}s, peerName: %{public}s", name.c_str(), peerSocketName.c_str());
605 SocketInfo socketInfo = {
606 .name = const_cast<char*>(name.c_str()),
607 .peerName = const_cast<char*>(peerSocketName.c_str()),
608 .peerNetworkId = const_cast<char*>(peerNetworkId.c_str()),
609 .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
610 .dataType = CHANNEL_SOFTBUS_DATATYPE_MAP[dataType]
611 };
612 int32_t sessionId = Socket(socketInfo);
613 HILOGI("finish, socket session id: %{public}d", sessionId);
614 return sessionId;
615 }
616
isValidChannelId(const int32_t channelId)617 bool ChannelManager::isValidChannelId(const int32_t channelId)
618 {
619 return channelId > CHANNEL_ID_GAP && channelId <= (FILE_START_ID + CHANNEL_ID_GAP);
620 }
621
DeleteChannel(const int32_t channelId)622 int32_t ChannelManager::DeleteChannel(const int32_t channelId)
623 {
624 HILOGI("start delete channel");
625 if (!isValidChannelId(channelId)) {
626 HILOGE("invalid channel id");
627 return INVALID_CHANNEL_ID;
628 }
629 ClearRegisterListener(channelId);
630 ClearSendTask(channelId);
631 ClearRegisterChannel(channelId);
632 ClearRegisterSocket(channelId);
633 HILOGI("end delete channel");
634 return channelId;
635 }
636
ClearRegisterChannel(const int32_t channelId)637 void ChannelManager::ClearRegisterChannel(const int32_t channelId)
638 {
639 HILOGI("start clear channel info, channelId=%{public}d", channelId);
640 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
641 std::string channelName;
642 auto infoIt = channelInfoMap_.find(channelId);
643 if (infoIt != channelInfoMap_.end()) {
644 channelName = infoIt->second.channelName;
645 }
646 channelInfoMap_.erase(channelId);
647
648 auto idIt = channelIdMap_.find(channelName);
649 if (idIt != channelIdMap_.end()) {
650 idIt->second.erase(std::remove(idIt->second.begin(), idIt->second.end(), channelId), idIt->second.end());
651 }
652 }
653
ClearRegisterListener(const int32_t channelId)654 void ChannelManager::ClearRegisterListener(const int32_t channelId)
655 {
656 HILOGI("start release listener, channelId=%{public}d", channelId);
657 std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
658 listenersMap_.erase(channelId);
659 }
660
ClearRegisterSocket(const int32_t channelId)661 void ChannelManager::ClearRegisterSocket(const int32_t channelId)
662 {
663 HILOGI("start release socket, channelId=%{public}d", channelId);
664 std::vector<int32_t> socketIds;
665 {
666 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
667 if (!socketChannelMap_.empty()) {
668 for (auto&& socket : socketChannelMap_) {
669 if (socket.second == channelId) {
670 socketIds.push_back(socket.first);
671 }
672 }
673 }
674 for (const auto socketId : socketIds) {
675 HILOGI("start release socket, %{public}d", socketId);
676 socketChannelMap_.erase(socketId);
677 socketStatusMap_.erase(socketId);
678 }
679 }
680 HILOGI("start to clear socket");
681 for (const auto socketId : socketIds) {
682 Shutdown(socketId);
683 }
684 }
685
ClearSendTask(int32_t channelId)686 void ChannelManager::ClearSendTask(int32_t channelId)
687 {
688 HILOGI("clear send task for=%{public}d", channelId);
689 if (eventHandler_ != nullptr) {
690 eventHandler_->RemoveTask(std::to_string(channelId));
691 }
692 }
693
RegisterChannelListener(const int32_t channelId,const std::shared_ptr<IChannelListener> listener)694 int32_t ChannelManager::RegisterChannelListener(const int32_t channelId,
695 const std::shared_ptr<IChannelListener> listener)
696 {
697 HILOGI("start register listener, channelId=%{public}d", channelId);
698 if (listener == nullptr) {
699 HILOGE("listener empty");
700 return INVALID_LISTENER;
701 }
702 std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
703 auto listenIt = listenersMap_.find(channelId);
704 if (listenIt == listenersMap_.end()) {
705 listenersMap_[channelId].emplace_back(listener);
706 return ERR_OK;
707 }
708 CleanInvalidListener(listenIt->second);
709 auto it = std::find_if(listenIt->second.begin(), listenIt->second.end(),
710 [&listener](const std::weak_ptr<IChannelListener> weakListener) {
711 if (auto ptr = weakListener.lock()) {
712 return listener == ptr;
713 }
714 return false;
715 });
716 if (it != listenIt->second.end()) {
717 HILOGI("already exist listener");
718 return ERR_OK;
719 }
720 listenIt->second.emplace_back(listener);
721 return ERR_OK;
722 }
723
CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>> & listeners)724 inline void ChannelManager::CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners)
725 {
726 listeners.erase(std::remove_if(listeners.begin(), listeners.end(),
727 [](const std::weak_ptr<IChannelListener> listener) {
728 return listener.expired();
729 }),
730 listeners.end());
731 }
732
ConnectChannel(const int32_t channelId)733 int32_t ChannelManager::ConnectChannel(const int32_t channelId)
734 {
735 HILOGI("start to connect channel %{public}d, only allow client", channelId);
736 std::vector<int32_t> socketIds;
737 ChannelDataType dataType;
738 {
739 std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
740 auto infoIt = channelInfoMap_.find(channelId);
741 if (infoIt == channelInfoMap_.end() || infoIt->second.clientSockets.empty()) {
742 HILOGE("invalid channel id");
743 return INVALID_CHANNEL_ID;
744 }
745 dataType = infoIt->second.dataType;
746 socketIds.insert(socketIds.begin(), infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
747 }
748 HILOGI("end");
749 return DoBindSockets(socketIds, dataType);
750 }
751
DoBindSockets(const std::vector<int32_t> & socketIds,const ChannelDataType dataType)752 int32_t ChannelManager::DoBindSockets(const std::vector<int32_t>& socketIds,
753 const ChannelDataType dataType)
754 {
755 HILOGI("start to connect sockets");
756 std::vector<std::future<int32_t>> bindTasks;
757 for (const auto& socketId : socketIds) {
758 if (GetSocketStatus(socketId) == ChannelStatus::UNCONNECTED) {
759 bindTasks.emplace_back(std::async(std::launch::async, [this, socketId, dataType]() {
760 return BindSocket(socketId, dataType);
761 }));
762 }
763 }
764 if (bindTasks.empty()) {
765 return ERR_OK;
766 }
767 for (auto&& task : bindTasks) {
768 int32_t ret = task.get();
769 HILOGI("bind task ret=%{public}d", ret);
770 if (ret == ERR_OK) {
771 return ERR_OK;
772 }
773 }
774
775 return CONNECT_CHANNEL_FAILED;
776 }
777
GetSocketStatus(const int32_t socketId)778 ChannelStatus ChannelManager::GetSocketStatus(const int32_t socketId)
779 {
780 std::shared_lock<std::shared_mutex> readLock(socketMutex_);
781 auto it = socketStatusMap_.find(socketId);
782 if (it != socketStatusMap_.end()) {
783 return it->second;
784 }
785 return ChannelStatus::CONNECTED;
786 }
787
BindSocket(const int32_t socketId,const ChannelDataType dataType)788 int32_t ChannelManager::BindSocket(const int32_t socketId, const ChannelDataType dataType)
789 {
790 QosSpeedType speedType = CHANNEL_DATATYPE_SPEED_MAP[dataType];
791 const QosTV* qos = qos_config[speedType];
792 const uint32_t qosCount = qos_speed_config[speedType];
793 int32_t ret = ERR_OK;
794 int retryCount = 0;
795 HILOGI("start to bind socket, id:%{public}d, speed:%{public}d", socketId, speedType);
796 do {
797 ret = Bind(socketId, qos, qosCount, &channelManagerListener);
798 if (ret == ERR_OK) {
799 break;
800 }
801 if (ret != SOFTBUS_LANE_WIFI_NOT_ONLINE) {
802 HILOGE("bind failed, err=%{public}d", ret);
803 return ret;
804 }
805 if (retryCount * BIND_RETRY_INTERVAL >= MAX_BIND_RETRY_TIME) {
806 HILOGE("bind failed after max retry time %{public}d ms", MAX_BIND_RETRY_TIME);
807 return ret;
808 }
809 HILOGI("bind failed, retrying after %{public}d ms, retry %{public}d", BIND_RETRY_INTERVAL, retryCount + 1);
810 usleep(BIND_RETRY_INTERVAL * MS_TO_US);
811 retryCount++;
812 } while (retryCount < MAX_RETRY_TIMES);
813 HILOGI("bind end");
814 if (ret != ERR_OK) {
815 HILOGE("client bind failed, ret: %{public}d", ret);
816 return BIND_SOCKET_FAILED;
817 }
818 if (dataType == ChannelDataType::FILE) {
819 if (dmsFileAdapetr_.SetFileSchema == nullptr) {
820 HILOGE("SetFileSchema is null.");
821 return INVALID_PARAMETERS_ERR;
822 }
823 ret = dmsFileAdapetr_.SetFileSchema(socketId);
824 }
825 if (ret != ERR_OK) {
826 HILOGE("register %{public}d file schema failed", socketId);
827 return REGISTER_FILE_SCHEMA_FAILED;
828 }
829 SetSocketStatus(socketId, ChannelStatus::CONNECTED);
830 return ret;
831 }
832
SetSocketStatus(const int32_t socketId,const ChannelStatus status)833 int32_t ChannelManager::SetSocketStatus(const int32_t socketId, const ChannelStatus status)
834 {
835 HILOGI("start set socket id:%{public}d status %{public}d", socketId, static_cast<int32_t>(status));
836 int32_t channelId = 0;
837 {
838 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
839 auto it = socketStatusMap_.find(socketId);
840 if (it == socketStatusMap_.end()) {
841 HILOGE("no valid socket in socketStatusMap");
842 return INVALID_SOCKET_ID;
843 }
844 it->second = status;
845 auto channelIt = socketChannelMap_.find(socketId);
846 if (channelIt == socketChannelMap_.end()) {
847 HILOGE("no valid socket in socketChannelMap");
848 return INVALID_SOCKET_ID;
849 }
850 channelId = channelIt->second;
851 }
852 auto func = [channelId, this]() {
853 UpdateChannelStatus(channelId);
854 };
855 return PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
856 }
857
UpdateChannelStatus(const int32_t channelId)858 int32_t ChannelManager::UpdateChannelStatus(const int32_t channelId)
859 {
860 HILOGI("update channel id=%{public}d", channelId);
861 std::vector<int32_t> socketIds;
862 ChannelStatus curStatus = ChannelStatus::UNCONNECTED;
863 {
864 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
865 auto infoIt = channelInfoMap_.find(channelId);
866 if (infoIt == channelInfoMap_.end()) {
867 HILOGE("no valid channelInfo for %{public}d", channelId);
868 return INVALID_CHANNEL_ID;
869 }
870 for (const auto socket : infoIt->second.clientSockets) {
871 socketIds.push_back(socket);
872 }
873 curStatus = infoIt->second.status;
874 }
875
876 ChannelStatus newStatus = ChannelStatus::UNCONNECTED;
877 for (const auto id : socketIds) {
878 if (GetSocketStatus(id) == ChannelStatus::CONNECTED) {
879 newStatus = ChannelStatus::CONNECTED;
880 break;
881 }
882 }
883 HILOGI("curStatus:%{public}d, newStatus:%{public}d",
884 static_cast<int32_t>(curStatus), static_cast<int32_t>(newStatus));
885 if (newStatus != curStatus) {
886 return SetChannelStatus(channelId, newStatus);
887 }
888 return ERR_OK;
889 }
890
SetChannelStatus(const int32_t channelId,const ChannelStatus status)891 int32_t ChannelManager::SetChannelStatus(const int32_t channelId, const ChannelStatus status)
892 {
893 HILOGI("set channel:%{public}d, status:%{public}d", channelId, static_cast<int32_t>(status));
894 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
895 auto infoIt = channelInfoMap_.find(channelId);
896 if (infoIt == channelInfoMap_.end()) {
897 HILOGE("no valid channelInfo for %{public}d", channelId);
898 return INVALID_CHANNEL_ID;
899 }
900 infoIt->second.status = status;
901 return ERR_OK;
902 }
903
OnSocketConnected(int32_t socketId,const PeerSocketInfo & info)904 void ChannelManager::OnSocketConnected(int32_t socketId, const PeerSocketInfo& info)
905 {
906 if (socketId <= 0) {
907 HILOGE("invalid socketId: %{public}d", socketId);
908 return;
909 }
910 HILOGI("socket %{public}d binded", socketId);
911 std::optional<std::string> channelNameOpt = GetChannelNameFromSocket(info.name);
912 if (!channelNameOpt) {
913 HILOGE("error socket name, %{public}s", info.name);
914 return;
915 }
916 auto& channelName = *channelNameOpt;
917 std::optional<ChannelDataType> channelType = GetChannelDataTypeFromName(channelName);
918 if (!channelType) {
919 HILOGE("error channel name, %{public}s", channelName.c_str());
920 return;
921 }
922 // remove datatype flag
923 constexpr int32_t namePrefix = 2;
924 channelName = channelName.substr(namePrefix);
925 int32_t channelId = GetChannelId(channelName, *channelType);
926 if (!isValidChannelId(channelId)) {
927 HILOGE("invalid channelid=%{public}d with channelName %{public}s", channelId, channelName.c_str());
928 return;
929 }
930 auto func = [socketId, channelId, this]() {
931 UpdateChannel(socketId, channelId);
932 };
933 PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
934 HILOGI("add update channel task into handler");
935 }
936
UpdateChannel(const int32_t socketId,const int32_t channelId)937 int32_t ChannelManager::UpdateChannel(const int32_t socketId, const int32_t channelId)
938 {
939 int32_t ret = RegisterSocket(socketId, channelId);
940 if (ret != ERR_OK) {
941 HILOGE("failed to save binded socket to matching channel");
942 DoErrorCallback(channelId, ret);
943 return ret;
944 }
945 ret = SetSocketStatus(socketId, ChannelStatus::CONNECTED);
946 if (ret != ERR_OK) {
947 HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
948 DoErrorCallback(channelId, ret);
949 return ret;
950 }
951 DoConnectCallback(channelId);
952 return ret;
953 }
954
GetChannelNameFromSocket(const std::string & socketName)955 std::optional<std::string> ChannelManager::GetChannelNameFromSocket(const std::string& socketName)
956 {
957 size_t splitPos = socketName.find(SPLIT_FLAG, CHANNEL_NAME_PREFIX_LENGTH);
958 if (splitPos == std::string::npos) {
959 HILOGE("peer socket name invalid");
960 return std::nullopt;
961 }
962 return socketName.substr(splitPos + SPLIT_FLAG.length());
963 }
964
GetChannelDataTypeFromName(const std::string & channelName)965 std::optional<ChannelDataType> ChannelManager::GetChannelDataTypeFromName(const std::string& channelName)
966 {
967 std::string prefix = channelName.substr(0, 1);
968 for (auto&& dataType : CHANNEL_DATATYPE_PREFIX_MAP) {
969 if (prefix == dataType.second) {
970 return dataType.first;
971 }
972 }
973 return std::nullopt;
974 }
975
GetChannelId(const std::string & channelName,const ChannelDataType dataType)976 int32_t ChannelManager::GetChannelId(const std::string& channelName, const ChannelDataType dataType)
977 {
978 HILOGI("channelName: %{public}s, dataType: %{public}d", channelName.c_str(),
979 static_cast<int32_t>(dataType));
980 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
981 auto it = channelIdMap_.find(channelName);
982 if (it == channelIdMap_.end()) {
983 HILOGE("no valid channel exist");
984 return INVALID_CHANNEL_NAME;
985 }
986 for (const auto channelId : it->second) {
987 auto infoIt = channelInfoMap_.find(channelId);
988 if (infoIt == channelInfoMap_.end()) {
989 HILOGE("no valid channel exist");
990 return INVALID_CHANNEL_ID;
991 }
992 // find matching dataType
993 if (infoIt->second.dataType == dataType) {
994 return infoIt->second.channelId;
995 }
996 }
997 HILOGE("no matching channel");
998 return NO_SUCH_CHANNEL;
999 }
1000
RegisterSocket(const int32_t socketId,const int32_t channelId)1001 int32_t ChannelManager::RegisterSocket(const int32_t socketId, const int32_t channelId)
1002 {
1003 // update channelInfo
1004 HILOGI("register socket with channel, channelId=%{public}d, socketId=%{public}d", channelId, socketId);
1005 ChannelDataType dataType = ChannelDataType::BYTES;
1006 {
1007 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
1008 auto infoIt = channelInfoMap_.find(channelId);
1009 if (infoIt == channelInfoMap_.end()) {
1010 HILOGE("no valid channel");
1011 return INVALID_CHANNEL_ID;
1012 }
1013 dataType = infoIt->second.dataType;
1014 infoIt->second.clientSockets.push_back(socketId);
1015 infoIt->second.dataSenderReceivers[socketId] = std::make_unique<DataSenderReceiver>(socketId);
1016 }
1017 // update socket
1018 {
1019 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
1020 socketChannelMap_[socketId] = channelId;
1021 socketStatusMap_[socketId] = ChannelStatus::CONNECTED;
1022 }
1023 if (dataType == ChannelDataType::FILE) {
1024 HILOGI("file socket, regist softbus file schema");
1025 if (dmsFileAdapetr_.SetFileSchema == nullptr) {
1026 HILOGE("SetFileSchema is null.");
1027 return INVALID_PARAMETERS_ERR;
1028 }
1029 int32_t ret = dmsFileAdapetr_.SetFileSchema(socketId);
1030 if (ret != ERR_OK) {
1031 HILOGE("register %{public}d file schema failed", socketId);
1032 return REGISTER_FILE_SCHEMA_FAILED;
1033 }
1034 }
1035 return ERR_OK;
1036 }
1037
1038 template <typename Func, typename... Args>
NotifyListeners(const int32_t channelId,Func listenerFunc,const AppExecFwk::EventQueue::Priority priority,Args &&...args)1039 void ChannelManager::NotifyListeners(const int32_t channelId, Func listenerFunc,
1040 const AppExecFwk::EventQueue::Priority priority, Args&&... args)
1041 {
1042 std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1043 auto it = listenersMap_.find(channelId);
1044 if (it == listenersMap_.end() || it->second.empty()) {
1045 HILOGE("no matching listener to %{public}d", channelId);
1046 return;
1047 }
1048 auto& listeners = it->second;
1049 for (const auto& listener : listeners) {
1050 if (auto ptr = listener.lock()) {
1051 auto func = [ptr, listenerFunc, channelId, args...]() {
1052 (ptr.get()->*listenerFunc)(channelId, std::forward<Args>(args)...);
1053 };
1054 PostCallbackTask(func, priority);
1055 }
1056 }
1057 }
1058
1059 template <typename Func, typename... Args>
NotifyListenersNew(const int32_t channelId,Func listenerFunc,const AppExecFwk::EventQueue::Priority priority,Args &&...args)1060 void ChannelManager::NotifyListenersNew(const int32_t channelId, Func listenerFunc,
1061 const AppExecFwk::EventQueue::Priority priority, Args&&... args)
1062 {
1063 std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1064 auto it = listenersMap_.find(channelId);
1065 if (it == listenersMap_.end() || it->second.empty()) {
1066 HILOGE("no matching listener to %{public}d", channelId);
1067 return;
1068 }
1069 auto& listeners = it->second;
1070 for (const auto& listener : listeners) {
1071 if (auto ptr = listener.lock()) {
1072 auto func = [ptr, listenerFunc, channelId, args...]() {
1073 (ptr.get()->*listenerFunc)(channelId, std::forward<Args>(args)...);
1074 };
1075 PostCallbackTaskNew(func, priority);
1076 }
1077 }
1078 }
1079
OnSocketError(int32_t socketId,const int32_t errorCode)1080 void ChannelManager::OnSocketError(int32_t socketId, const int32_t errorCode)
1081 {
1082 int32_t channelId = 0;
1083 CHECK_SOCKET_ID(socketId);
1084 CHECK_CHANNEL_ID(socketId, channelId);
1085 DoErrorCallback(channelId, errorCode);
1086 }
1087
DoErrorCallback(const int32_t channelId,const int32_t errorCode)1088 void ChannelManager::DoErrorCallback(const int32_t channelId, const int32_t errorCode)
1089 {
1090 NotifyListeners(channelId, &IChannelListener::OnError,
1091 AppExecFwk::EventQueue::Priority::IMMEDIATE, errorCode);
1092 }
1093
DoConnectCallback(const int32_t channelId)1094 void ChannelManager::DoConnectCallback(const int32_t channelId)
1095 {
1096 NotifyListeners(channelId, &IChannelListener::OnConnect,
1097 AppExecFwk::EventQueue::Priority::IMMEDIATE);
1098 }
1099
OnSocketClosed(int32_t socketId,const ShutdownReason & reason)1100 void ChannelManager::OnSocketClosed(int32_t socketId, const ShutdownReason& reason)
1101 {
1102 int32_t channelId = 0;
1103 CHECK_SOCKET_ID(socketId);
1104 CHECK_CHANNEL_ID(socketId, channelId);
1105 HILOGI("socket %{public}d closed, reason:%{public}d", socketId, reason);
1106 int32_t ret = SetSocketStatus(socketId, ChannelStatus::UNCONNECTED);
1107 if (ret != ERR_OK) {
1108 HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
1109 DoErrorCallback(channelId, ret);
1110 return;
1111 }
1112 // delete channel when all socket shutdown
1113 auto func = [channelId, reason, this]() {
1114 if (GetChannelStatus(channelId) == ChannelStatus::UNCONNECTED) {
1115 DoDisConnectCallback(channelId, reason);
1116 DeleteChannel(channelId);
1117 }
1118 };
1119 PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
1120 }
1121
GetChannelId(const int32_t socketId)1122 int32_t ChannelManager::GetChannelId(const int32_t socketId)
1123 {
1124 std::shared_lock<std::shared_mutex> readLock(socketMutex_);
1125 auto it = socketChannelMap_.find(socketId);
1126 if (it == socketChannelMap_.end()) {
1127 HILOGE("no proper channelId to %{public}d", socketId);
1128 return INVALID_SOCKET_ID;
1129 }
1130 return it->second;
1131 }
1132
DoDisConnectCallback(const int32_t channelId,const ShutdownReason & reason)1133 void ChannelManager::DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason)
1134 {
1135 NotifyListeners(channelId, &IChannelListener::OnDisConnect,
1136 AppExecFwk::EventQueue::Priority::IMMEDIATE, reason);
1137 }
1138
GetChannelStatus(const int32_t channelId)1139 ChannelStatus ChannelManager::GetChannelStatus(const int32_t channelId)
1140 {
1141 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
1142 auto it = channelInfoMap_.find(channelId);
1143 if (it != channelInfoMap_.end()) {
1144 return it->second.status;
1145 }
1146 return ChannelStatus::UNCONNECTED;
1147 }
1148
1149 template <typename Func, typename... Args>
DoSendData(const int32_t channelId,Func doSendFunc,Args &&...args)1150 int32_t ChannelManager::DoSendData(const int32_t channelId, Func doSendFunc, Args&&... args)
1151 {
1152 HILOGI("start to send data");
1153 int32_t socketId = GetValidSocket(channelId);
1154 if (socketId <= 0) {
1155 HILOGE("no avaliable sockets, %{public}d", channelId);
1156 return NO_CONNECTED_SOCKET_ID;
1157 }
1158 int32_t ret = ERR_OK;
1159 {
1160 std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1161 auto infoIt = channelInfoMap_.find(channelId);
1162 if (infoIt == channelInfoMap_.end()) {
1163 HILOGE("no valid channel info exist");
1164 DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1165 return INVALID_CHANNEL_ID;
1166 }
1167 auto socketIt = infoIt->second.dataSenderReceivers.find(socketId);
1168 if (socketIt == infoIt->second.dataSenderReceivers.end()) {
1169 HILOGE("no valid socket");
1170 DoErrorCallback(channelId, INVALID_SOCKET_ID);
1171 return INVALID_SOCKET_ID;
1172 }
1173 auto& senderReceiver = *(socketIt->second);
1174 ret = (senderReceiver.*doSendFunc)(std::forward<Args>(args)...);
1175 }
1176 if (ret != ERR_OK) {
1177 HILOGE("failed send data, %{public}d", ret);
1178 DoErrorCallback(channelId, ret);
1179 return ret;
1180 }
1181 return ERR_OK;
1182 }
1183
SendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1184 int32_t ChannelManager::SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data)
1185 {
1186 if (!isValidChannelId(channelId) || data == nullptr) {
1187 HILOGE("invalid channel id. %{public}d", channelId);
1188 return INVALID_CHANNEL_ID;
1189 }
1190 HILOGI("start to send bytes, %{public}u", static_cast<uint32_t>(data->Size()));
1191 auto func = [channelId, data, this]() {
1192 DoSendBytes(channelId, data);
1193 };
1194 int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1195 std::to_string(channelId));
1196 if (ret != ERR_OK) {
1197 HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1198 return ret;
1199 }
1200 HILOGI("send bytes task added to handler");
1201 return ERR_OK;
1202 }
1203
DoSendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1204 inline int32_t ChannelManager::DoSendBytes(const int32_t channelId,
1205 const std::shared_ptr<AVTransDataBuffer>& data)
1206 {
1207 HILOGI("start to send bytes");
1208 return DoSendData(channelId, &DataSenderReceiver::SendBytesData, data);
1209 }
1210
GetValidSocket(const int32_t channelId)1211 int32_t ChannelManager::GetValidSocket(const int32_t channelId)
1212 {
1213 std::vector<int32_t> socketIds;
1214 {
1215 std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1216 auto infoIt = channelInfoMap_.find(channelId);
1217 if (infoIt == channelInfoMap_.end() || infoIt->second.status == ChannelStatus::UNCONNECTED) {
1218 HILOGE("invalid channelId, %{public}d", channelId);
1219 return -1;
1220 }
1221 socketIds.insert(socketIds.begin(),
1222 infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
1223 }
1224
1225 for (const auto socketId : socketIds) {
1226 if (GetSocketStatus(socketId) == ChannelStatus::CONNECTED) {
1227 return socketId;
1228 }
1229 }
1230 return -1;
1231 }
1232
SendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1233 int32_t ChannelManager::SendStream(const int32_t channelId,
1234 const std::shared_ptr<AVTransStreamData>& data)
1235 {
1236 if (!isValidChannelId(channelId) || data == nullptr) {
1237 HILOGE("invalid channel id");
1238 DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1239 return INVALID_CHANNEL_ID;
1240 }
1241 HILOGI("start to send stream");
1242 auto func = [=]() {
1243 DoSendStream(channelId, data);
1244 };
1245 int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1246 std::to_string(channelId));
1247 if (ret != ERR_OK) {
1248 HILOGE("failed to add send stream task, ret=%{public}d", ret);
1249 return POST_TASK_FAILED;
1250 }
1251 HILOGI("send stream task added to handler");
1252 return ERR_OK;
1253 }
1254
DoSendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1255 int32_t ChannelManager::DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1256 {
1257 HILOGI("start to send stream");
1258 return DoSendData(channelId, &DataSenderReceiver::SendStreamData, data);
1259 }
1260
SendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1261 int32_t ChannelManager::SendMessage(const int32_t channelId,
1262 const std::shared_ptr<AVTransDataBuffer>& data)
1263 {
1264 if (!isValidChannelId(channelId) || data == nullptr) {
1265 HILOGE("invalid channel id. %{public}d", channelId);
1266 return INVALID_CHANNEL_ID;
1267 }
1268 HILOGI("start to send message, %{public}u", static_cast<uint32_t>(data->Size()));
1269 auto func = [channelId, data, this]() {
1270 DoSendMessage(channelId, data);
1271 };
1272 int32_t ret = PostMsgTask(func, AppExecFwk::EventQueue::Priority::HIGH);
1273 if (ret != ERR_OK) {
1274 HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1275 return ret;
1276 }
1277 HILOGD("send message task added to handler");
1278 return ERR_OK;
1279 }
1280
DoSendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1281 int32_t ChannelManager::DoSendMessage(const int32_t channelId,
1282 const std::shared_ptr<AVTransDataBuffer>& data)
1283 {
1284 HILOGI("start to send message");
1285 return DoSendData(channelId, &DataSenderReceiver::SendMessageData, data);
1286 }
1287
SendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1288 int32_t ChannelManager::SendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1289 const std::vector<std::string>& dFiles)
1290 {
1291 HILOGI("start to send files, %{public}d", channelId);
1292 if (!isValidChannelId(channelId) || sFiles.empty() || dFiles.empty()) {
1293 HILOGE("invalid channel id. %{public}d or empty sfiles", channelId);
1294 return INVALID_PARAMETERS_ERR;
1295 }
1296 if (sFiles.size() != dFiles.size() || sFiles.size() > MAX_FILE_COUNT) {
1297 HILOGE("src size:%{public}d, dst size:%{public}d illegal",
1298 static_cast<int32_t>(sFiles.size()),
1299 static_cast<int32_t>(dFiles.size()));
1300 }
1301 int32_t ret = ERR_OK;
1302 auto func = [channelId, sFiles, dFiles, this]() {
1303 DoSendFile(channelId, sFiles, dFiles);
1304 };
1305 ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW);
1306 if (ret != ERR_OK) {
1307 HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1308 return ret;
1309 }
1310 HILOGI("send files task added to handler");
1311 return ERR_OK;
1312 }
1313
DoSendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1314 int32_t ChannelManager::DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1315 const std::vector<std::string>& dFiles)
1316 {
1317 HILOGI("start to do send files");
1318 return DoSendData(channelId, &DataSenderReceiver::SendFileData, sFiles, dFiles);
1319 }
1320
OnBytesReceived(int32_t socketId,const void * data,const uint32_t dataLen)1321 void ChannelManager::OnBytesReceived(int32_t socketId,
1322 const void* data, const uint32_t dataLen)
1323 {
1324 int32_t channelId = 0;
1325 CHECK_SOCKET_ID(socketId);
1326 CHECK_CHANNEL_ID(socketId, channelId);
1327 CHECK_DATA_NULL(socketId, data, OnError);
1328 HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1329 std::shared_ptr<AVTransDataBuffer> packedData = ProcessRecvData(channelId, socketId, data, dataLen);
1330 if (!packedData) {
1331 return;
1332 }
1333 DoBytesReceiveCallback(channelId, packedData);
1334 }
1335
ProcessRecvData(const int32_t channelId,const int32_t socketId,const void * data,const uint32_t dataLen)1336 std::shared_ptr<AVTransDataBuffer> ChannelManager::ProcessRecvData(const int32_t channelId,
1337 const int32_t socketId, const void* data, const uint32_t dataLen)
1338 {
1339 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
1340 const uint8_t* header = static_cast<const uint8_t*>(data);
1341 auto infoIt = channelInfoMap_.find(channelId);
1342 if (infoIt == channelInfoMap_.end()) {
1343 DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1344 return nullptr;
1345 }
1346
1347 int32_t ret = infoIt->second.dataSenderReceivers[socketId]->PackRecvPacketData(header, dataLen);
1348 if (ret != ERR_OK) {
1349 HILOGE("pack recv data failed");
1350 DoErrorCallback(channelId, ret);
1351 return nullptr;
1352 }
1353 return infoIt->second.dataSenderReceivers[socketId]->GetPacketedData();
1354 }
1355
DoBytesReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1356 void ChannelManager::DoBytesReceiveCallback(const int32_t channelId,
1357 const std::shared_ptr<AVTransDataBuffer>& buffer)
1358 {
1359 NotifyListeners(channelId, &IChannelListener::OnBytes,
1360 AppExecFwk::EventQueue::Priority::LOW, buffer);
1361 }
1362
OnMessageReceived(int32_t socketId,const void * data,const uint32_t dataLen)1363 void ChannelManager::OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen)
1364 {
1365 HILOGI("data len = %{public}d", dataLen);
1366 if (dataLen > MAX_LEN) {
1367 HILOGE("dataLen is too long");
1368 return;
1369 }
1370 int32_t channelId = 0;
1371 CHECK_SOCKET_ID(socketId);
1372 CHECK_CHANNEL_ID(socketId, channelId);
1373 CHECK_DATA_NULL(socketId, data, OnError);
1374 HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1375 std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(dataLen);
1376 int32_t ret = memcpy_s(buffer->Data(),
1377 buffer->Size(), data, dataLen);
1378 if (ret != ERR_OK) {
1379 HILOGE("pack recv data failed");
1380 DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1381 return;
1382 }
1383 DoMessageReceiveCallback(channelId, buffer);
1384 }
1385
DoMessageReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1386 void ChannelManager::DoMessageReceiveCallback(const int32_t channelId,
1387 const std::shared_ptr<AVTransDataBuffer>& buffer)
1388 {
1389 NotifyListenersNew(channelId, &IChannelListener::OnMessage,
1390 AppExecFwk::EventQueue::Priority::HIGH, buffer);
1391 }
1392
OnStreamReceived(int32_t socketId,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)1393 void ChannelManager::OnStreamReceived(int32_t socketId, const StreamData* data,
1394 const StreamData* ext, const StreamFrameInfo* param)
1395 {
1396 int32_t channelId = 0;
1397 CHECK_SOCKET_ID(socketId);
1398 CHECK_CHANNEL_ID(socketId, channelId);
1399 CHECK_DATA_NULL(socketId, data, OnError);
1400 CHECK_DATA_NULL(socketId, ext, OnError);
1401 std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(data->bufLen);
1402 int32_t ret = memcpy_s(buffer->Data(), buffer->Size(), data->buf, data->bufLen);
1403 if (ret != ERR_OK) {
1404 HILOGE("copy stream data failed, %{public}d", socketId);
1405 DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1406 return;
1407 }
1408 AVTransStreamDataExt streamDataExt;
1409 std::shared_ptr<AVTransStreamData> streamData = std::make_shared<AVTransStreamData>(buffer, streamDataExt);
1410 ret = streamData->DeserializeStreamDataExt(ext->buf);
1411 if (ret != ERR_OK) {
1412 HILOGE("deserialize stream ext failed, %{public}d", socketId);
1413 DoErrorCallback(channelId, PARSE_AV_TRANS_STREAM_EXT_FAILED);
1414 return;
1415 }
1416 DoStreamReceiveCallback(channelId, streamData);
1417 }
1418
DoStreamReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1419 void ChannelManager::DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1420 {
1421 NotifyListeners(channelId, &IChannelListener::OnStream,
1422 AppExecFwk::EventQueue::Priority::LOW, data);
1423 }
1424
OnFileEventReceived(int32_t socketId,FileEvent * event)1425 void ChannelManager::OnFileEventReceived(int32_t socketId, FileEvent *event)
1426 {
1427 int32_t channelId = 0;
1428 CHECK_SOCKET_ID(socketId);
1429 // update recv path before onbind
1430 if (event == nullptr) {
1431 HILOGE("socket %{public}d event empty", socketId);
1432 return;
1433 }
1434 if (event->type == FileEventType::FILE_EVENT_RECV_UPDATE_PATH) {
1435 HILOGI("start to set update path func, %{public}d", socketId);
1436 return DispatchProcessFileEvent(fileChannelId_.load(), event);
1437 }
1438 CHECK_CHANNEL_ID(socketId, channelId);
1439 CHECK_DATA_NULL(socketId, event, OnError);
1440 HILOGI("start to dispatch file event, %{public}d", channelId);
1441 DispatchProcessFileEvent(channelId, event);
1442 }
1443
DispatchProcessFileEvent(int32_t channelId,FileEvent * event)1444 void ChannelManager::DispatchProcessFileEvent(int32_t channelId, FileEvent *event)
1445 {
1446 HILOGI("start to dispatch file event");
1447 switch (event->type) {
1448 case FileEventType::FILE_EVENT_SEND_PROCESS:
1449 case FileEventType::FILE_EVENT_SEND_FINISH: {
1450 DealFileSendEvent(channelId, event);
1451 break;
1452 }
1453 case FileEventType::FILE_EVENT_RECV_START:
1454 case FileEventType::FILE_EVENT_RECV_PROCESS:
1455 case FileEventType::FILE_EVENT_RECV_FINISH: {
1456 DealFileRecvEvent(channelId, event);
1457 break;
1458 }
1459 case FileEventType::FILE_EVENT_BUTT:
1460 case FileEventType::FILE_EVENT_SEND_ERROR:
1461 case FileEventType::FILE_EVENT_RECV_ERROR: {
1462 DealFileErrorEvent(channelId, event);
1463 break;
1464 }
1465 case FileEventType::FILE_EVENT_RECV_UPDATE_PATH: {
1466 DealFileUpdatePathEvent(channelId, event);
1467 break;
1468 }
1469 default:
1470 break;
1471 }
1472 }
1473
DealFileSendEvent(int32_t channelId,FileEvent * event)1474 void ChannelManager::DealFileSendEvent(int32_t channelId, FileEvent *event)
1475 {
1476 HILOGI("start to deal file send event, %{public}d", channelId);
1477 FileInfo info;
1478 if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1479 info.commonInfo.eventType = ChannelFileEvent::SEND_PROCESS;
1480 } else {
1481 info.commonInfo.eventType = ChannelFileEvent::SEND_FINISH;
1482 }
1483 info.commonInfo.fileCnt = event->fileCnt;
1484 for (uint32_t i = 0; i < event->fileCnt; ++i) {
1485 info.commonInfo.fileList.push_back(std::string(event->files[i]));
1486 }
1487
1488 FileSendInfo sendInfo;
1489 sendInfo.bytesProcessed = event->bytesProcessed;
1490 sendInfo.bytesTotal = event->bytesTotal;
1491 if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1492 sendInfo.rate = event->rate;
1493 }
1494 info.sendInfo = sendInfo;
1495 DoFileSendCallback(channelId, info);
1496 HILOGI("end");
1497 }
1498
DealFileRecvEvent(int32_t channelId,FileEvent * event)1499 void ChannelManager::DealFileRecvEvent(int32_t channelId, FileEvent *event)
1500 {
1501 HILOGI("start to deal file recv event, %{public}d", channelId);
1502 FileInfo info;
1503 if (event->type == FileEventType::FILE_EVENT_RECV_START) {
1504 info.commonInfo.eventType = ChannelFileEvent::RECV_START;
1505 } else if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1506 info.commonInfo.eventType = ChannelFileEvent::RECV_PROCESS;
1507 } else {
1508 info.commonInfo.eventType = ChannelFileEvent::RECV_FINISH;
1509 }
1510 info.commonInfo.fileCnt = event->fileCnt;
1511 for (uint32_t i = 0; i < event->fileCnt; ++i) {
1512 info.commonInfo.fileList.push_back(std::string(event->files[i]));
1513 }
1514
1515 FileRecvInfo recvInfo;
1516 recvInfo.bytesProcessed = event->bytesProcessed;
1517 recvInfo.bytesTotal = event->bytesTotal;
1518 if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1519 recvInfo.rate = event->rate;
1520 }
1521 info.recvInfo = recvInfo;
1522 DoFileRecvCallback(channelId, info);
1523 HILOGI("end");
1524 }
1525
DealFileErrorEvent(int32_t channelId,FileEvent * event)1526 void ChannelManager::DealFileErrorEvent(int32_t channelId, FileEvent *event)
1527 {
1528 HILOGI("start to deal file error event, %{public}d", channelId);
1529 FileInfo info;
1530 if (event->type == FileEventType::FILE_EVENT_SEND_ERROR) {
1531 info.commonInfo.eventType = ChannelFileEvent::SEND_ERROR;
1532 } else {
1533 info.commonInfo.eventType = ChannelFileEvent::RECV_ERROR;
1534 }
1535 info.commonInfo.fileCnt = event->fileCnt;
1536 for (uint32_t i = 0; i < event->fileCnt; ++i) {
1537 info.commonInfo.fileList.push_back(std::string(event->files[i]));
1538 }
1539
1540 FileErrorInfo errorInfo;
1541 errorInfo.errorCode = event->errorCode;
1542 info.errorInfo = errorInfo;
1543 if (info.commonInfo.eventType == ChannelFileEvent::RECV_ERROR) {
1544 DoFileRecvCallback(channelId, info);
1545 } else {
1546 DoFileSendCallback(channelId, info);
1547 }
1548 HILOGI("end");
1549 }
1550
DealFileUpdatePathEvent(int32_t channelId,FileEvent * event)1551 void ChannelManager::DealFileUpdatePathEvent(int32_t channelId, FileEvent *event)
1552 {
1553 HILOGI("start to deal file update path event, %{public}d", channelId);
1554 event->UpdateRecvPath = GetRecvPath;
1555 HILOGI("end");
1556 }
1557
GetRecvPathFromUser()1558 const char* ChannelManager::GetRecvPathFromUser()
1559 {
1560 HILOGI("get recv path from user");
1561 int32_t channelId = static_cast<int32_t>(fileChannelId_.load());
1562 std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1563 auto it = listenersMap_.find(channelId);
1564 if (it == listenersMap_.end() || it->second.empty()) {
1565 HILOGE("no matching listener to %{public}d", channelId);
1566 return nullptr;
1567 }
1568 auto& listeners = it->second;
1569 for (const auto& listener : listeners) {
1570 if (auto ptr = listener.lock()) {
1571 return ptr->GetRecvPath(channelId);
1572 }
1573 }
1574 return nullptr;
1575 }
1576
DoFileRecvCallback(const int32_t channelId,const FileInfo & info)1577 void ChannelManager::DoFileRecvCallback(const int32_t channelId, const FileInfo& info)
1578 {
1579 NotifyListeners(channelId, &IChannelListener::OnRecvFile,
1580 AppExecFwk::EventQueue::Priority::HIGH, info);
1581 }
1582
DoFileSendCallback(const int32_t channelId,const FileInfo & info)1583 void ChannelManager::DoFileSendCallback(const int32_t channelId, const FileInfo& info)
1584 {
1585 NotifyListeners(channelId, &IChannelListener::OnSendFile,
1586 AppExecFwk::EventQueue::Priority::HIGH, info);
1587 }
1588 }
1589 }
1590