• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #include "channel_manager.h"
17 
18 #include <algorithm>
19 #include <chrono>
20 #include <dlfcn.h>
21 #include <future>
22 #include <sys/prctl.h>
23 #include <unistd.h>
24 #include <unordered_set>
25 
26 #include "dtbcollabmgr_log.h"
27 #include "securec.h"
28 #include "softbus_error_code.h"
29 
30 namespace OHOS {
31 namespace DistributedCollab {
32 IMPLEMENT_SINGLE_INSTANCE(ChannelManager);
33 namespace {
34     static const std::string TAG = "DSchedCollabChannelManager";
35     constexpr int32_t BIND_RETRY_INTERVAL = 500;
36     constexpr int32_t MAX_BIND_RETRY_TIME = 1500;
37     constexpr int32_t MAX_RETRY_TIMES = 3;
38     constexpr int32_t MS_TO_US = 1000;
39     enum class QosSpeedType {
40         HIGH,
41         LOW
42     };
43 
44     static std::map<ChannelDataType, TransDataType> CHANNEL_SOFTBUS_DATATYPE_MAP = {
45         { ChannelDataType::MESSAGE, DATA_TYPE_MESSAGE },
46         { ChannelDataType::BYTES, DATA_TYPE_BYTES },
47         { ChannelDataType::VIDEO_STREAM, DATA_TYPE_VIDEO_STREAM },
48         { ChannelDataType::FILE, DATA_TYPE_FILE }
49     };
50 
51     static std::map<ChannelDataType, QosSpeedType> CHANNEL_DATATYPE_SPEED_MAP = {
52         { ChannelDataType::MESSAGE, QosSpeedType::LOW },
53         { ChannelDataType::BYTES, QosSpeedType::LOW },
54         { ChannelDataType::VIDEO_STREAM, QosSpeedType::HIGH },
55         { ChannelDataType::FILE, QosSpeedType::HIGH }
56     };
57 
58     static const std::string SPLIT_FLAG = "_";
59     static const std::string COLLAB_PGK_NAME = "dms";
60     static const std::string SESSION_NAME_PREFIX = "ohos.dtbcollab.dms";
61 
62     static std::map<ChannelDataType, std::string> CHANNEL_DATATYPE_PREFIX_MAP = {
63         { ChannelDataType::MESSAGE, "M" },
64         { ChannelDataType::BYTES, "B" },
65         { ChannelDataType::VIDEO_STREAM, "V" },
66         { ChannelDataType::FILE, "F" }
67     };
68 
69     static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW = 4 * 1024 * 1024;
70     static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY = 10000;
71     static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY = 2000;
72 
73     static QosTV g_low_qosInfo[] = {
74         { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW },
75         { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY },
76         { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY },
77         { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
78     };
79 
80     static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW = 80 * 1024 * 1024;
81     static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY = 10000;
82     static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY = 2000;
83 
84     static QosTV g_high_qosInfo[] = {
85         { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW },
86         { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY },
87         { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY },
88         { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
89     };
90 
91     static std::map<QosSpeedType, QosTV*> qos_config = {
92         { QosSpeedType::HIGH, g_high_qosInfo },
93         { QosSpeedType::LOW, g_low_qosInfo }
94     };
95 
96     static uint32_t g_lowQosTvParamIndex = static_cast<uint32_t>(sizeof(g_low_qosInfo) / sizeof(QosTV));
97     static uint32_t g_highQosTvParamIndex = static_cast<uint32_t>(sizeof(g_high_qosInfo) / sizeof(QosTV));
98     static std::map<QosSpeedType, uint32_t> qos_speed_config = {
99         { QosSpeedType::HIGH, g_highQosTvParamIndex },
100         { QosSpeedType::LOW, g_lowQosTvParamIndex }
101     };
102 
103 #define CHECK_SOCKET_ID(socketId)                          \
104 do {                                                       \
105     if ((socketId) <= 0) {                                 \
106         HILOGE("invalid socket id, %{public}d", socketId); \
107         return;                                            \
108     }                                                      \
109 } while (0)
110 
111 #define CHECK_CHANNEL_ID(socketId, channelId)                                 \
112 do {                                                                          \
113     (channelId) = GetChannelId(socketId);                                     \
114     if (!isValidChannelId(channelId)) {                                       \
115         HILOGE("invalid socket id %{public}d, can't find channel", socketId); \
116         return;                                                               \
117     }                                                                         \
118 } while (0)
119 
120 #define CHECK_DATA_NULL(socketId, data, errorHandler)                      \
121 do {                                                                       \
122     if ((data) == nullptr) {                                               \
123         HILOGE("receive empty bytes data, socketId=%{public}d", socketId); \
124         (errorHandler)(channelId, RECV_DATA_EMPTY);                        \
125         return;                                                            \
126     }                                                                      \
127 } while (0)
128 }
129 
130 namespace {
131     constexpr int32_t MAX_LEN = 10 * 1024;
132 }
133 
OnSocketConnected(int32_t socket,PeerSocketInfo info)134 static void OnSocketConnected(int32_t socket, PeerSocketInfo info)
135 {
136     ChannelManager::GetInstance().OnSocketConnected(socket, info);
137 }
138 
OnSocketClosed(int32_t socket,ShutdownReason reason)139 static void OnSocketClosed(int32_t socket, ShutdownReason reason)
140 {
141     ChannelManager::GetInstance().OnSocketClosed(socket, reason);
142 }
143 
OnBytesRecv(int32_t socket,const void * data,uint32_t dataLen)144 static void OnBytesRecv(int32_t socket, const void* data, uint32_t dataLen)
145 {
146     ChannelManager::GetInstance().OnBytesReceived(socket, data, dataLen);
147 }
148 
OnMessageRecv(int32_t socket,const void * data,uint32_t dataLen)149 static void OnMessageRecv(int32_t socket, const void* data, uint32_t dataLen)
150 {
151     ChannelManager::GetInstance().OnMessageReceived(socket, data, dataLen);
152 }
153 
OnStreamRecv(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)154 static void OnStreamRecv(int32_t socket, const StreamData* data, const StreamData* ext,
155     const StreamFrameInfo* param)
156 {
157     ChannelManager::GetInstance().OnStreamReceived(socket, data, ext, param);
158 }
159 
OnError(int32_t socket,int32_t errCode)160 static void OnError(int32_t socket, int32_t errCode)
161 {
162     ChannelManager::GetInstance().OnSocketError(socket, errCode);
163 }
164 
OnFileEvent(int32_t socket,FileEvent * event)165 static void OnFileEvent(int32_t socket, FileEvent *event)
166 {
167     ChannelManager::GetInstance().OnFileEventReceived(socket, event);
168 }
169 
GetRecvPath()170 static const char* GetRecvPath()
171 {
172     return ChannelManager::GetInstance().GetRecvPathFromUser();
173 }
174 
175 ISocketListener channelManagerListener = {
176     .OnBind = OnSocketConnected,
177     .OnShutdown = OnSocketClosed,
178     .OnBytes = OnBytesRecv,
179     .OnMessage = OnMessageRecv,
180     .OnStream = OnStreamRecv,
181     .OnFile = OnFileEvent,
182     .OnError = OnError,
183 };
184 
~ChannelManager()185 ChannelManager::~ChannelManager()
186 {
187     DeInit();
188 };
189 
Init(const std::string & ownerName)190 int32_t ChannelManager::Init(const std::string& ownerName)
191 {
192     HILOGI("start init channel manager");
193     if (eventHandler_ != nullptr && callbackEventHandler_ != nullptr && msgEventHandler_ != nullptr &&
194         callbackEventHandlerNew_ != nullptr) {
195         HILOGW("server channel already init");
196         return ERR_OK;
197     }
198     if (serverSocketId_ > 0) {
199         HILOGW("server socket already init");
200         return ERR_OK;
201     }
202     ownerName_ = ownerName;
203 
204     eventThread_ = std::thread(&ChannelManager::StartEvent, this);
205     std::unique_lock<std::mutex> lock(eventMutex_);
206     eventCon_.wait(lock, [this] {
207         return eventHandler_ != nullptr;
208     });
209 
210     callbackEventThread_ = std::thread(&ChannelManager::StartCallbackEvent, this);
211     std::unique_lock<std::mutex> callbackLock(callbackEventMutex_);
212     callbackEventCon_.wait(callbackLock, [this] {
213         return callbackEventHandler_ != nullptr;
214     });
215 
216     msgEventThread_ = std::thread(&ChannelManager::StartMsgEvent, this);
217     std::unique_lock<std::mutex> msgLock(msgEventMutex_);
218     msgEventCon_.wait(msgLock, [this] {
219         return msgEventHandler_ != nullptr;
220     });
221 
222     callbackEventNewThread_ = std::thread(&ChannelManager::StartCallbackEventNew, this);
223     std::unique_lock<std::mutex> callbackNewLock(callbackEventNewMutex_);
224     callbackEventNewCon_.wait(callbackNewLock, [this] {
225         return callbackEventHandlerNew_ != nullptr;
226     });
227 
228     int32_t socketServerId = CreateServerSocket();
229     if (socketServerId <= 0) {
230         HILOGE("create socket failed, ret: %{public}d", socketServerId);
231         return CREATE_SOCKET_FAILED;
232     }
233     int32_t ret = Listen(socketServerId, g_low_qosInfo, g_lowQosTvParamIndex, &channelManagerListener);
234     if (ret != ERR_OK) {
235         HILOGE("service listen failed, ret: %{public}d", ret);
236         return LISTEN_SOCKET_FAILED;
237     }
238     serverSocketId_ = socketServerId;
239     int32_t result = GetDmsInteractiveAdapterProxy();
240     if (result != ERR_OK) {
241         HILOGE("Get remote dms interactive adapter proxy fail, ret %{public}d.", ret);
242     }
243     HILOGI("end");
244     return ERR_OK;
245 }
246 
StartEvent()247 void ChannelManager::StartEvent()
248 {
249     HILOGI("StartEvent start");
250     prctl(PR_SET_NAME, ownerName_.c_str());
251     auto runner = AppExecFwk::EventRunner::Create(false);
252     {
253         std::lock_guard<std::mutex> lock(eventMutex_);
254         eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
255     }
256     eventCon_.notify_one();
257     runner->Run();
258     HILOGI("StartEvent end");
259 }
260 
StartCallbackEvent()261 void ChannelManager::StartCallbackEvent()
262 {
263     HILOGI("Start callback event start");
264     std::string callbackName = ownerName_ + "callback";
265     prctl(PR_SET_NAME, callbackName.c_str());
266     auto runner = AppExecFwk::EventRunner::Create(false);
267     {
268         std::lock_guard<std::mutex> lock(callbackEventMutex_);
269         callbackEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
270     }
271     callbackEventCon_.notify_one();
272     runner->Run();
273     HILOGI("callback event end");
274 }
275 
StartCallbackEventNew()276 void ChannelManager::StartCallbackEventNew()
277 {
278     HILOGI("Start new callback event start");
279     std::string callbackName = ownerName_ + "callbackNew";
280     prctl(PR_SET_NAME, callbackName.c_str());
281     auto runner = AppExecFwk::EventRunner::Create(false);
282     {
283         std::lock_guard<std::mutex> lock(callbackEventNewMutex_);
284         callbackEventHandlerNew_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
285     }
286     callbackEventNewCon_.notify_one();
287     runner->Run();
288     HILOGI("new callback event end");
289 }
290 
StartMsgEvent()291 void ChannelManager::StartMsgEvent()
292 {
293     HILOGI("Start msg event start");
294     std::string msgName = ownerName_ + "msg";
295     prctl(PR_SET_NAME, msgName.c_str());
296     auto runner = AppExecFwk::EventRunner::Create(false);
297     {
298         std::lock_guard<std::mutex> lock(msgEventMutex_);
299         msgEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
300     }
301     msgEventCon_.notify_one();
302     runner->Run();
303     HILOGI("msg event end");
304 }
305 
PostTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority,const std::string & name)306 int32_t ChannelManager::PostTask(const AppExecFwk::InnerEvent::Callback& callback,
307     const AppExecFwk::EventQueue::Priority priority, const std::string& name)
308 {
309     if (eventHandler_ == nullptr) {
310         HILOGE("event handler empty");
311         return NULL_EVENT_HANDLER;
312     }
313     if (eventHandler_->PostTask(callback, name, 0, priority)) {
314         return ERR_OK;
315     }
316     HILOGE("add task failed");
317     return POST_TASK_FAILED;
318 }
319 
PostCallbackTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)320 int32_t ChannelManager::PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback,
321     const AppExecFwk::EventQueue::Priority priority)
322 {
323     if (callbackEventHandler_ == nullptr) {
324         HILOGE("callback event handler empty");
325         return NULL_EVENT_HANDLER;
326     }
327     if (callbackEventHandler_->PostTask(callback, priority)) {
328         return ERR_OK;
329     }
330     HILOGE("add callback task failed");
331     return POST_TASK_FAILED;
332 }
333 
PostCallbackTaskNew(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)334 int32_t ChannelManager::PostCallbackTaskNew(const AppExecFwk::InnerEvent::Callback& callback,
335     const AppExecFwk::EventQueue::Priority priority)
336 {
337     if (callbackEventHandlerNew_ == nullptr) {
338         HILOGE("new callback event handler empty");
339         return NULL_EVENT_HANDLER;
340     }
341     if (callbackEventHandlerNew_->PostTask(callback, priority)) {
342         return ERR_OK;
343     }
344     HILOGE("add new callback task failed");
345     return POST_TASK_FAILED;
346 }
347 
PostMsgTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)348 int32_t ChannelManager::PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback,
349     const AppExecFwk::EventQueue::Priority priority)
350 {
351     if (msgEventHandler_ == nullptr) {
352         HILOGE("msg event handler empty");
353         return NULL_EVENT_HANDLER;
354     }
355     if (msgEventHandler_->PostTask(callback, priority)) {
356         return ERR_OK;
357     }
358     HILOGE("add msg task failed");
359     return POST_TASK_FAILED;
360 }
361 
CreateServerSocket()362 int32_t ChannelManager::CreateServerSocket()
363 {
364     HILOGI("start create server socket");
365     std::string sessionName = SESSION_NAME_PREFIX + ownerName_;
366     HILOGI("sessionName: %{public}s, size: %{public}zu", sessionName.c_str(),
367         sessionName.length());
368     SocketInfo info = {
369         .name = const_cast<char*>(sessionName.c_str()),
370         .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
371     };
372     int32_t socket = Socket(info);
373     HILOGI("finish, socket id: %{public}d", socket);
374     return socket;
375 }
376 
DeInit()377 void ChannelManager::DeInit()
378 {
379     HILOGI("start deinit channel manager");
380     // stop all task
381     if (eventHandler_ != nullptr) {
382         eventHandler_->GetEventRunner()->Stop();
383         if (eventThread_.joinable()) {
384             eventThread_.join();
385         }
386         eventHandler_ = nullptr;
387     } else {
388         HILOGE("eventHandler_ is nullptr");
389     }
390 
391     // stop callback task
392     if (callbackEventHandler_ != nullptr) {
393         callbackEventHandler_->GetEventRunner()->Stop();
394         if (callbackEventThread_.joinable()) {
395             callbackEventThread_.join();
396         }
397         callbackEventHandler_ = nullptr;
398     } else {
399         HILOGE("callbackEventHandler_ is nullptr");
400     }
401 
402     // stop msg task
403     if (msgEventHandler_ != nullptr) {
404         msgEventHandler_->GetEventRunner()->Stop();
405         if (msgEventThread_.joinable()) {
406             msgEventThread_.join();
407         }
408         msgEventHandler_ = nullptr;
409     }
410 
411     // stop new callback task
412     if (callbackEventHandlerNew_ != nullptr) {
413         callbackEventHandlerNew_->GetEventRunner()->Stop();
414         if (callbackEventNewThread_.joinable()) {
415             callbackEventNewThread_.join();
416         }
417         callbackEventHandlerNew_ = nullptr;
418     }
419 
420     // release channels
421     std::unordered_set<int32_t> channelIds;
422     for (const auto& entry : channelIdMap_) {
423         for (int32_t id : entry.second) {
424             channelIds.insert(id);
425         }
426     }
427     for (const int32_t id : channelIds) {
428         DeleteChannel(id);
429     }
430     dlclose(dllHandle_);
431     dllHandle_ = nullptr;
432     dmsFileAdapetr_.SetFileSchema = nullptr;
433     Shutdown(serverSocketId_);
434     Reset();
435     HILOGI("end");
436 }
437 
GetDmsInteractiveAdapterProxy()438 int32_t ChannelManager::GetDmsInteractiveAdapterProxy()
439 {
440     HILOGI("Get remote dms interactive adapter proxy.");
441     std::lock_guard<std::mutex> autoLock(dmsAdapetrLock_);
442 #if (defined(__aarch64__) || defined(__x86_64__))
443     char resolvedPath[100] = "/system/lib64/libdms_interactive_adapter.z.so";
444 #else
445     char resolvedPath[100] = "/system/lib/libdms_interactive_adapter.z.so";
446 #endif
447     int32_t (*GetSoftbusFile)(ISoftbusFileAdpater &dmsFileHandle) = nullptr;
448 
449     dllHandle_ = dlopen(resolvedPath, RTLD_LAZY);
450     if (dllHandle_ == nullptr) {
451         HILOGE("Open dms interactive adapter shared object fail, resolvedPath [%{public}s].", resolvedPath);
452         return NOT_FIND_SERVICE_REGISTRY;
453     }
454 
455     int32_t ret = ERR_OK;
456     do {
457         GetSoftbusFile = reinterpret_cast<int32_t (*)(ISoftbusFileAdpater &dmsFileHandle)>(
458             dlsym(dllHandle_, "GetSoftbusFile"));
459         if (GetSoftbusFile == nullptr) {
460             HILOGE("Link the GetDmsInteractiveAdapter symbol in dms interactive adapter fail.");
461             ret = NOT_FIND_SERVICE_REGISTRY;
462             break;
463         }
464 
465         if (GetSoftbusFile(dmsFileAdapetr_)) {
466             HILOGE("Init remote dms interactive adapter proxy fail, ret %{public}d.", ret);
467             ret = INVALID_PARAMETERS_ERR;
468             break;
469         }
470         ret = ERR_OK;
471     } while (false);
472 
473     if (ret != ERR_OK) {
474         HILOGE("Get remote dms interactive adapter proxy fail, dlclose handle.");
475         dlclose(dllHandle_);
476         dllHandle_ = nullptr;
477     }
478     return ret;
479 }
480 
Reset()481 void ChannelManager::Reset()
482 {
483     HILOGI("reset channel manager");
484     serverSocketId_ = -1;
485     ownerName_ = "";
486     nextIds_ = {
487         { ChannelDataType::MESSAGE, MESSAGE_START_ID },
488         { ChannelDataType::BYTES, BYTES_START_ID },
489         { ChannelDataType::VIDEO_STREAM, STREAM_START_ID },
490         { ChannelDataType::FILE, FILE_START_ID }
491     };
492 }
493 
GetVersion()494 int32_t ChannelManager::GetVersion()
495 {
496     return VERSION_;
497 }
498 
CreateServerChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)499 int32_t ChannelManager::CreateServerChannel(const std::string& channelName,
500     const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
501 {
502     HILOGI("start to creat server channel waiting for connect");
503     std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
504     if (!info) {
505         HILOGE("Create server channel failed");
506         return CREATE_SERVER_CHANNEL_FAILED;
507     }
508     std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
509     channelIdMap_[channelName].push_back(info->channelId);
510     channelInfoMap_.emplace(info->channelId, std::move(*info));
511     // save file channel
512     fileChannelId_.store(info->channelId);
513     HILOGI("end");
514     return info->channelId;
515 }
516 
CreateClientChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)517 int32_t ChannelManager::CreateClientChannel(const std::string& channelName,
518     const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
519 {
520     HILOGI("start to creat client channel to connect other");
521     std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
522     if (!info) {
523         HILOGE("Create client channel failed");
524         return CREATE_CLIENT_CHANNEL_FAILED;
525     }
526     int32_t ret = RegisterSocket(*info, dataType);
527     HILOGI("end");
528     return ret == ERR_OK ? info->channelId : ret;
529 };
530 
CreateBaseChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)531 std::optional<ChannelInfo> ChannelManager::CreateBaseChannel(const std::string& channelName,
532     const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
533 {
534     HILOGI("start create base channel, dataType=%{public}d, name=%{public}s",
535         static_cast<int32_t>(dataType), channelName.c_str());
536     int32_t channelId = GenerateNextId(dataType);
537     if (!isValidChannelId(channelId)) {
538         HILOGE("Get channel id failed, id=%{public}d", channelId);
539         return std::nullopt;
540     }
541     ChannelInfo info;
542     info.channelId = channelId;
543     info.channelName = channelName;
544     info.status = ChannelStatus::UNCONNECTED;
545     info.dataType = dataType;
546     info.peerInfo = peerInfo;
547     return info;
548 }
549 
GenerateNextId(const ChannelDataType dataType)550 int32_t ChannelManager::GenerateNextId(const ChannelDataType dataType)
551 {
552     int32_t channelId = 0;
553     // lock for each type
554     std::lock_guard<std::mutex> typeLock(typeMutex_[dataType]);
555     HILOGI("create socket for %{public}d", static_cast<int32_t>(dataType));
556     channelId = nextIds_[dataType];
557     if (channelId - CHANNEL_ID_GAP * (static_cast<int32_t>(dataType) + 1)
558         >= CHANNEL_ID_GAP) {
559         HILOGE("type %{public}d exceed max channel",
560             static_cast<int32_t>(dataType));
561         return CHANNEL_NUM_EXCEED_LIMIT;
562     }
563     nextIds_[dataType]++;
564     return channelId;
565 }
566 
RegisterSocket(ChannelInfo & info,const ChannelDataType dataType)567 int32_t ChannelManager::RegisterSocket(ChannelInfo& info, const ChannelDataType dataType)
568 {
569     int32_t clientSocketId = CreateClientSocket(info.channelName,
570         info.peerInfo.peerName, info.peerInfo.networkId, dataType);
571     if (clientSocketId <= 0) {
572         HILOGE("create socket failed, ret: %{public}d", clientSocketId);
573         return CREATE_SOCKET_FAILED;
574     }
575     // save info to each map
576     {
577         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
578         socketChannelMap_.emplace(clientSocketId, info.channelId);
579         socketStatusMap_.emplace(clientSocketId, ChannelStatus::UNCONNECTED);
580     }
581     HILOGI("register channel name: %{public}s", info.channelName.c_str());
582     {
583         std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
584         info.clientSockets.push_back(clientSocketId);
585         info.dataSenderReceivers[clientSocketId] = std::make_unique<DataSenderReceiver>(clientSocketId);
586         channelIdMap_[info.channelName].push_back(info.channelId);
587         channelInfoMap_.emplace(info.channelId, std::move(info));
588     }
589     return ERR_OK;
590 }
591 
CreateClientSocket(const std::string & channelName,const std::string & peerName,const std::string & peerNetworkId,const ChannelDataType dataType)592 int32_t ChannelManager::CreateClientSocket(const std::string& channelName,
593     const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType)
594 {
595     HILOGI("start");
596     if (channelName.length() > MAX_CHANNEL_NAME_LENGTH) {
597         HILOGE("channel name too long, %{public}s", channelName.c_str());
598         return -INVALID_CHANNEL_NAME;
599     }
600     // ohos.dtbcollab.dms64_F_64
601     std::string name = SESSION_NAME_PREFIX + ownerName_ +
602         SPLIT_FLAG + CHANNEL_DATATYPE_PREFIX_MAP[dataType] + SPLIT_FLAG + channelName;
603     std::string peerSocketName = SESSION_NAME_PREFIX + peerName;
604     HILOGI("self-name: %{public}s, peerName: %{public}s", name.c_str(), peerSocketName.c_str());
605     SocketInfo socketInfo = {
606         .name = const_cast<char*>(name.c_str()),
607         .peerName = const_cast<char*>(peerSocketName.c_str()),
608         .peerNetworkId = const_cast<char*>(peerNetworkId.c_str()),
609         .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
610         .dataType = CHANNEL_SOFTBUS_DATATYPE_MAP[dataType]
611     };
612     int32_t sessionId = Socket(socketInfo);
613     HILOGI("finish, socket session id: %{public}d", sessionId);
614     return sessionId;
615 }
616 
isValidChannelId(const int32_t channelId)617 bool ChannelManager::isValidChannelId(const int32_t channelId)
618 {
619     return channelId > CHANNEL_ID_GAP && channelId <= (FILE_START_ID + CHANNEL_ID_GAP);
620 }
621 
DeleteChannel(const int32_t channelId)622 int32_t ChannelManager::DeleteChannel(const int32_t channelId)
623 {
624     HILOGI("start delete channel");
625     if (!isValidChannelId(channelId)) {
626         HILOGE("invalid channel id");
627         return INVALID_CHANNEL_ID;
628     }
629     ClearRegisterListener(channelId);
630     ClearSendTask(channelId);
631     ClearRegisterChannel(channelId);
632     ClearRegisterSocket(channelId);
633     HILOGI("end delete channel");
634     return channelId;
635 }
636 
ClearRegisterChannel(const int32_t channelId)637 void ChannelManager::ClearRegisterChannel(const int32_t channelId)
638 {
639     HILOGI("start clear channel info, channelId=%{public}d", channelId);
640     std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
641     std::string channelName;
642     auto infoIt = channelInfoMap_.find(channelId);
643     if (infoIt != channelInfoMap_.end()) {
644         channelName = infoIt->second.channelName;
645     }
646     channelInfoMap_.erase(channelId);
647 
648     auto idIt = channelIdMap_.find(channelName);
649     if (idIt != channelIdMap_.end()) {
650         idIt->second.erase(std::remove(idIt->second.begin(), idIt->second.end(), channelId), idIt->second.end());
651     }
652 }
653 
ClearRegisterListener(const int32_t channelId)654 void ChannelManager::ClearRegisterListener(const int32_t channelId)
655 {
656     HILOGI("start release listener, channelId=%{public}d", channelId);
657     std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
658     listenersMap_.erase(channelId);
659 }
660 
ClearRegisterSocket(const int32_t channelId)661 void ChannelManager::ClearRegisterSocket(const int32_t channelId)
662 {
663     HILOGI("start release socket, channelId=%{public}d", channelId);
664     std::vector<int32_t> socketIds;
665     {
666         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
667         if (!socketChannelMap_.empty()) {
668             for (auto&& socket : socketChannelMap_) {
669                 if (socket.second == channelId) {
670                     socketIds.push_back(socket.first);
671                 }
672             }
673         }
674         for (const auto socketId : socketIds) {
675             HILOGI("start release socket, %{public}d", socketId);
676             socketChannelMap_.erase(socketId);
677             socketStatusMap_.erase(socketId);
678         }
679     }
680     HILOGI("start to clear socket");
681     for (const auto socketId : socketIds) {
682         Shutdown(socketId);
683     }
684 }
685 
ClearSendTask(int32_t channelId)686 void ChannelManager::ClearSendTask(int32_t channelId)
687 {
688     HILOGI("clear send task for=%{public}d", channelId);
689     if (eventHandler_ != nullptr) {
690         eventHandler_->RemoveTask(std::to_string(channelId));
691     }
692 }
693 
RegisterChannelListener(const int32_t channelId,const std::shared_ptr<IChannelListener> listener)694 int32_t ChannelManager::RegisterChannelListener(const int32_t channelId,
695     const std::shared_ptr<IChannelListener> listener)
696 {
697     HILOGI("start register listener, channelId=%{public}d", channelId);
698     if (listener == nullptr) {
699         HILOGE("listener empty");
700         return INVALID_LISTENER;
701     }
702     std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
703     auto listenIt = listenersMap_.find(channelId);
704     if (listenIt == listenersMap_.end()) {
705         listenersMap_[channelId].emplace_back(listener);
706         return ERR_OK;
707     }
708     CleanInvalidListener(listenIt->second);
709     auto it = std::find_if(listenIt->second.begin(), listenIt->second.end(),
710         [&listener](const std::weak_ptr<IChannelListener> weakListener) {
711             if (auto ptr = weakListener.lock()) {
712                 return listener == ptr;
713             }
714             return false;
715         });
716     if (it != listenIt->second.end()) {
717         HILOGI("already exist listener");
718         return ERR_OK;
719     }
720     listenIt->second.emplace_back(listener);
721     return ERR_OK;
722 }
723 
CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>> & listeners)724 inline void ChannelManager::CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners)
725 {
726     listeners.erase(std::remove_if(listeners.begin(), listeners.end(),
727         [](const std::weak_ptr<IChannelListener> listener) {
728             return listener.expired();
729         }),
730         listeners.end());
731 }
732 
ConnectChannel(const int32_t channelId)733 int32_t ChannelManager::ConnectChannel(const int32_t channelId)
734 {
735     HILOGI("start to connect channel %{public}d, only allow client", channelId);
736     std::vector<int32_t> socketIds;
737     ChannelDataType dataType;
738     {
739         std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
740         auto infoIt = channelInfoMap_.find(channelId);
741         if (infoIt == channelInfoMap_.end() || infoIt->second.clientSockets.empty()) {
742             HILOGE("invalid channel id");
743             return INVALID_CHANNEL_ID;
744         }
745         dataType = infoIt->second.dataType;
746         socketIds.insert(socketIds.begin(), infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
747     }
748     HILOGI("end");
749     return DoBindSockets(socketIds, dataType);
750 }
751 
DoBindSockets(const std::vector<int32_t> & socketIds,const ChannelDataType dataType)752 int32_t ChannelManager::DoBindSockets(const std::vector<int32_t>& socketIds,
753     const ChannelDataType dataType)
754 {
755     HILOGI("start to connect sockets");
756     std::vector<std::future<int32_t>> bindTasks;
757     for (const auto& socketId : socketIds) {
758         if (GetSocketStatus(socketId) == ChannelStatus::UNCONNECTED) {
759             bindTasks.emplace_back(std::async(std::launch::async, [this, socketId, dataType]() {
760                 return BindSocket(socketId, dataType);
761             }));
762         }
763     }
764     if (bindTasks.empty()) {
765         return ERR_OK;
766     }
767     for (auto&& task : bindTasks) {
768         int32_t ret = task.get();
769         HILOGI("bind task ret=%{public}d", ret);
770         if (ret == ERR_OK) {
771             return ERR_OK;
772         }
773     }
774 
775     return CONNECT_CHANNEL_FAILED;
776 }
777 
GetSocketStatus(const int32_t socketId)778 ChannelStatus ChannelManager::GetSocketStatus(const int32_t socketId)
779 {
780     std::shared_lock<std::shared_mutex> readLock(socketMutex_);
781     auto it = socketStatusMap_.find(socketId);
782     if (it != socketStatusMap_.end()) {
783         return it->second;
784     }
785     return ChannelStatus::CONNECTED;
786 }
787 
BindSocket(const int32_t socketId,const ChannelDataType dataType)788 int32_t ChannelManager::BindSocket(const int32_t socketId, const ChannelDataType dataType)
789 {
790     QosSpeedType speedType = CHANNEL_DATATYPE_SPEED_MAP[dataType];
791     const QosTV* qos = qos_config[speedType];
792     const uint32_t qosCount = qos_speed_config[speedType];
793     int32_t ret = ERR_OK;
794     int retryCount = 0;
795     HILOGI("start to bind socket, id:%{public}d, speed:%{public}d", socketId, speedType);
796     do {
797         ret = Bind(socketId, qos, qosCount, &channelManagerListener);
798         if (ret == ERR_OK) {
799             break;
800         }
801         if (ret != SOFTBUS_LANE_WIFI_NOT_ONLINE) {
802             HILOGE("bind failed, err=%{public}d", ret);
803             return ret;
804         }
805         if (retryCount * BIND_RETRY_INTERVAL >= MAX_BIND_RETRY_TIME) {
806             HILOGE("bind failed after max retry time %{public}d ms", MAX_BIND_RETRY_TIME);
807             return ret;
808         }
809         HILOGI("bind failed, retrying after %{public}d ms, retry %{public}d", BIND_RETRY_INTERVAL, retryCount + 1);
810         usleep(BIND_RETRY_INTERVAL * MS_TO_US);
811         retryCount++;
812     } while (retryCount < MAX_RETRY_TIMES);
813     HILOGI("bind end");
814     if (ret != ERR_OK) {
815         HILOGE("client bind failed, ret: %{public}d", ret);
816         return BIND_SOCKET_FAILED;
817     }
818     if (dataType == ChannelDataType::FILE) {
819         if (dmsFileAdapetr_.SetFileSchema == nullptr) {
820             HILOGE("SetFileSchema is null.");
821             return INVALID_PARAMETERS_ERR;
822         }
823         ret = dmsFileAdapetr_.SetFileSchema(socketId);
824     }
825     if (ret != ERR_OK) {
826         HILOGE("register %{public}d file schema failed", socketId);
827         return REGISTER_FILE_SCHEMA_FAILED;
828     }
829     SetSocketStatus(socketId, ChannelStatus::CONNECTED);
830     return ret;
831 }
832 
SetSocketStatus(const int32_t socketId,const ChannelStatus status)833 int32_t ChannelManager::SetSocketStatus(const int32_t socketId, const ChannelStatus status)
834 {
835     HILOGI("start set socket id:%{public}d status %{public}d", socketId, static_cast<int32_t>(status));
836     int32_t channelId = 0;
837     {
838         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
839         auto it = socketStatusMap_.find(socketId);
840         if (it == socketStatusMap_.end()) {
841             HILOGE("no valid socket in socketStatusMap");
842             return INVALID_SOCKET_ID;
843         }
844         it->second = status;
845         auto channelIt = socketChannelMap_.find(socketId);
846         if (channelIt == socketChannelMap_.end()) {
847             HILOGE("no valid socket in socketChannelMap");
848             return INVALID_SOCKET_ID;
849         }
850         channelId = channelIt->second;
851     }
852     auto func = [channelId, this]() {
853         UpdateChannelStatus(channelId);
854     };
855     return PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
856 }
857 
UpdateChannelStatus(const int32_t channelId)858 int32_t ChannelManager::UpdateChannelStatus(const int32_t channelId)
859 {
860     HILOGI("update channel id=%{public}d", channelId);
861     std::vector<int32_t> socketIds;
862     ChannelStatus curStatus = ChannelStatus::UNCONNECTED;
863     {
864         std::shared_lock<std::shared_mutex> readLock(channelMutex_);
865         auto infoIt = channelInfoMap_.find(channelId);
866         if (infoIt == channelInfoMap_.end()) {
867             HILOGE("no valid channelInfo for %{public}d", channelId);
868             return INVALID_CHANNEL_ID;
869         }
870         for (const auto socket : infoIt->second.clientSockets) {
871             socketIds.push_back(socket);
872         }
873         curStatus = infoIt->second.status;
874     }
875 
876     ChannelStatus newStatus = ChannelStatus::UNCONNECTED;
877     for (const auto id : socketIds) {
878         if (GetSocketStatus(id) == ChannelStatus::CONNECTED) {
879             newStatus = ChannelStatus::CONNECTED;
880             break;
881         }
882     }
883     HILOGI("curStatus:%{public}d, newStatus:%{public}d",
884         static_cast<int32_t>(curStatus), static_cast<int32_t>(newStatus));
885     if (newStatus != curStatus) {
886         return SetChannelStatus(channelId, newStatus);
887     }
888     return ERR_OK;
889 }
890 
SetChannelStatus(const int32_t channelId,const ChannelStatus status)891 int32_t ChannelManager::SetChannelStatus(const int32_t channelId, const ChannelStatus status)
892 {
893     HILOGI("set channel:%{public}d, status:%{public}d", channelId, static_cast<int32_t>(status));
894     std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
895     auto infoIt = channelInfoMap_.find(channelId);
896     if (infoIt == channelInfoMap_.end()) {
897         HILOGE("no valid channelInfo for %{public}d", channelId);
898         return INVALID_CHANNEL_ID;
899     }
900     infoIt->second.status = status;
901     return ERR_OK;
902 }
903 
OnSocketConnected(int32_t socketId,const PeerSocketInfo & info)904 void ChannelManager::OnSocketConnected(int32_t socketId, const PeerSocketInfo& info)
905 {
906     if (socketId <= 0) {
907         HILOGE("invalid socketId: %{public}d", socketId);
908         return;
909     }
910     HILOGI("socket %{public}d binded", socketId);
911     std::optional<std::string> channelNameOpt = GetChannelNameFromSocket(info.name);
912     if (!channelNameOpt) {
913         HILOGE("error socket name, %{public}s", info.name);
914         return;
915     }
916     auto& channelName = *channelNameOpt;
917     std::optional<ChannelDataType> channelType = GetChannelDataTypeFromName(channelName);
918     if (!channelType) {
919         HILOGE("error channel name, %{public}s", channelName.c_str());
920         return;
921     }
922     // remove datatype flag
923     constexpr int32_t namePrefix = 2;
924     channelName = channelName.substr(namePrefix);
925     int32_t channelId = GetChannelId(channelName, *channelType);
926     if (!isValidChannelId(channelId)) {
927         HILOGE("invalid channelid=%{public}d with channelName %{public}s", channelId, channelName.c_str());
928         return;
929     }
930     auto func = [socketId, channelId, this]() {
931         UpdateChannel(socketId, channelId);
932     };
933     PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
934     HILOGI("add update channel task into handler");
935 }
936 
UpdateChannel(const int32_t socketId,const int32_t channelId)937 int32_t ChannelManager::UpdateChannel(const int32_t socketId, const int32_t channelId)
938 {
939     int32_t ret = RegisterSocket(socketId, channelId);
940     if (ret != ERR_OK) {
941         HILOGE("failed to save binded socket to matching channel");
942         DoErrorCallback(channelId, ret);
943         return ret;
944     }
945     ret = SetSocketStatus(socketId, ChannelStatus::CONNECTED);
946     if (ret != ERR_OK) {
947         HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
948         DoErrorCallback(channelId, ret);
949         return ret;
950     }
951     DoConnectCallback(channelId);
952     return ret;
953 }
954 
GetChannelNameFromSocket(const std::string & socketName)955 std::optional<std::string> ChannelManager::GetChannelNameFromSocket(const std::string& socketName)
956 {
957     size_t splitPos = socketName.find(SPLIT_FLAG, CHANNEL_NAME_PREFIX_LENGTH);
958     if (splitPos == std::string::npos) {
959         HILOGE("peer socket name invalid");
960         return std::nullopt;
961     }
962     return socketName.substr(splitPos + SPLIT_FLAG.length());
963 }
964 
GetChannelDataTypeFromName(const std::string & channelName)965 std::optional<ChannelDataType> ChannelManager::GetChannelDataTypeFromName(const std::string& channelName)
966 {
967     std::string prefix = channelName.substr(0, 1);
968     for (auto&& dataType : CHANNEL_DATATYPE_PREFIX_MAP) {
969         if (prefix == dataType.second) {
970             return dataType.first;
971         }
972     }
973     return std::nullopt;
974 }
975 
GetChannelId(const std::string & channelName,const ChannelDataType dataType)976 int32_t ChannelManager::GetChannelId(const std::string& channelName, const ChannelDataType dataType)
977 {
978     HILOGI("channelName: %{public}s, dataType: %{public}d", channelName.c_str(),
979         static_cast<int32_t>(dataType));
980     std::shared_lock<std::shared_mutex> readLock(channelMutex_);
981     auto it = channelIdMap_.find(channelName);
982     if (it == channelIdMap_.end()) {
983         HILOGE("no valid channel exist");
984         return INVALID_CHANNEL_NAME;
985     }
986     for (const auto channelId : it->second) {
987         auto infoIt = channelInfoMap_.find(channelId);
988         if (infoIt == channelInfoMap_.end()) {
989             HILOGE("no valid channel exist");
990             return INVALID_CHANNEL_ID;
991         }
992         // find matching dataType
993         if (infoIt->second.dataType == dataType) {
994             return infoIt->second.channelId;
995         }
996     }
997     HILOGE("no matching channel");
998     return NO_SUCH_CHANNEL;
999 }
1000 
RegisterSocket(const int32_t socketId,const int32_t channelId)1001 int32_t ChannelManager::RegisterSocket(const int32_t socketId, const int32_t channelId)
1002 {
1003     // update channelInfo
1004     HILOGI("register socket with channel, channelId=%{public}d, socketId=%{public}d", channelId, socketId);
1005     ChannelDataType dataType = ChannelDataType::BYTES;
1006     {
1007         std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
1008         auto infoIt = channelInfoMap_.find(channelId);
1009         if (infoIt == channelInfoMap_.end()) {
1010             HILOGE("no valid channel");
1011             return INVALID_CHANNEL_ID;
1012         }
1013         dataType = infoIt->second.dataType;
1014         infoIt->second.clientSockets.push_back(socketId);
1015         infoIt->second.dataSenderReceivers[socketId] = std::make_unique<DataSenderReceiver>(socketId);
1016     }
1017     // update socket
1018     {
1019         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
1020         socketChannelMap_[socketId] = channelId;
1021         socketStatusMap_[socketId] = ChannelStatus::CONNECTED;
1022     }
1023     if (dataType == ChannelDataType::FILE) {
1024         HILOGI("file socket, regist softbus file schema");
1025         if (dmsFileAdapetr_.SetFileSchema == nullptr) {
1026             HILOGE("SetFileSchema is null.");
1027             return INVALID_PARAMETERS_ERR;
1028         }
1029         int32_t ret = dmsFileAdapetr_.SetFileSchema(socketId);
1030         if (ret != ERR_OK) {
1031             HILOGE("register %{public}d file schema failed", socketId);
1032             return REGISTER_FILE_SCHEMA_FAILED;
1033         }
1034     }
1035     return ERR_OK;
1036 }
1037 
1038 template <typename Func, typename... Args>
NotifyListeners(const int32_t channelId,Func listenerFunc,const AppExecFwk::EventQueue::Priority priority,Args &&...args)1039 void ChannelManager::NotifyListeners(const int32_t channelId, Func listenerFunc,
1040     const AppExecFwk::EventQueue::Priority priority, Args&&... args)
1041 {
1042     std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1043     auto it = listenersMap_.find(channelId);
1044     if (it == listenersMap_.end() || it->second.empty()) {
1045         HILOGE("no matching listener to %{public}d", channelId);
1046         return;
1047     }
1048     auto& listeners = it->second;
1049     for (const auto& listener : listeners) {
1050         if (auto ptr = listener.lock()) {
1051             auto func = [ptr, listenerFunc, channelId, args...]() {
1052                 (ptr.get()->*listenerFunc)(channelId, std::forward<Args>(args)...);
1053             };
1054             PostCallbackTask(func, priority);
1055         }
1056     }
1057 }
1058 
1059 template <typename Func, typename... Args>
NotifyListenersNew(const int32_t channelId,Func listenerFunc,const AppExecFwk::EventQueue::Priority priority,Args &&...args)1060 void ChannelManager::NotifyListenersNew(const int32_t channelId, Func listenerFunc,
1061     const AppExecFwk::EventQueue::Priority priority, Args&&... args)
1062 {
1063     std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1064     auto it = listenersMap_.find(channelId);
1065     if (it == listenersMap_.end() || it->second.empty()) {
1066         HILOGE("no matching listener to %{public}d", channelId);
1067         return;
1068     }
1069     auto& listeners = it->second;
1070     for (const auto& listener : listeners) {
1071         if (auto ptr = listener.lock()) {
1072             auto func = [ptr, listenerFunc, channelId, args...]() {
1073                 (ptr.get()->*listenerFunc)(channelId, std::forward<Args>(args)...);
1074             };
1075             PostCallbackTaskNew(func, priority);
1076         }
1077     }
1078 }
1079 
OnSocketError(int32_t socketId,const int32_t errorCode)1080 void ChannelManager::OnSocketError(int32_t socketId, const int32_t errorCode)
1081 {
1082     int32_t channelId = 0;
1083     CHECK_SOCKET_ID(socketId);
1084     CHECK_CHANNEL_ID(socketId, channelId);
1085     DoErrorCallback(channelId, errorCode);
1086 }
1087 
DoErrorCallback(const int32_t channelId,const int32_t errorCode)1088 void ChannelManager::DoErrorCallback(const int32_t channelId, const int32_t errorCode)
1089 {
1090     NotifyListeners(channelId, &IChannelListener::OnError,
1091         AppExecFwk::EventQueue::Priority::IMMEDIATE, errorCode);
1092 }
1093 
DoConnectCallback(const int32_t channelId)1094 void ChannelManager::DoConnectCallback(const int32_t channelId)
1095 {
1096     NotifyListeners(channelId, &IChannelListener::OnConnect,
1097         AppExecFwk::EventQueue::Priority::IMMEDIATE);
1098 }
1099 
OnSocketClosed(int32_t socketId,const ShutdownReason & reason)1100 void ChannelManager::OnSocketClosed(int32_t socketId, const ShutdownReason& reason)
1101 {
1102     int32_t channelId = 0;
1103     CHECK_SOCKET_ID(socketId);
1104     CHECK_CHANNEL_ID(socketId, channelId);
1105     HILOGI("socket %{public}d closed, reason:%{public}d", socketId, reason);
1106     int32_t ret = SetSocketStatus(socketId, ChannelStatus::UNCONNECTED);
1107     if (ret != ERR_OK) {
1108         HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
1109         DoErrorCallback(channelId, ret);
1110         return;
1111     }
1112     // delete channel when all socket shutdown
1113     auto func = [channelId, reason, this]() {
1114         if (GetChannelStatus(channelId) == ChannelStatus::UNCONNECTED) {
1115             DoDisConnectCallback(channelId, reason);
1116             DeleteChannel(channelId);
1117         }
1118     };
1119     PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
1120 }
1121 
GetChannelId(const int32_t socketId)1122 int32_t ChannelManager::GetChannelId(const int32_t socketId)
1123 {
1124     std::shared_lock<std::shared_mutex> readLock(socketMutex_);
1125     auto it = socketChannelMap_.find(socketId);
1126     if (it == socketChannelMap_.end()) {
1127         HILOGE("no proper channelId to %{public}d", socketId);
1128         return INVALID_SOCKET_ID;
1129     }
1130     return it->second;
1131 }
1132 
DoDisConnectCallback(const int32_t channelId,const ShutdownReason & reason)1133 void ChannelManager::DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason)
1134 {
1135     NotifyListeners(channelId, &IChannelListener::OnDisConnect,
1136         AppExecFwk::EventQueue::Priority::IMMEDIATE, reason);
1137 }
1138 
GetChannelStatus(const int32_t channelId)1139 ChannelStatus ChannelManager::GetChannelStatus(const int32_t channelId)
1140 {
1141     std::shared_lock<std::shared_mutex> readLock(channelMutex_);
1142     auto it = channelInfoMap_.find(channelId);
1143     if (it != channelInfoMap_.end()) {
1144         return it->second.status;
1145     }
1146     return ChannelStatus::UNCONNECTED;
1147 }
1148 
1149 template <typename Func, typename... Args>
DoSendData(const int32_t channelId,Func doSendFunc,Args &&...args)1150 int32_t ChannelManager::DoSendData(const int32_t channelId, Func doSendFunc, Args&&... args)
1151 {
1152     HILOGI("start to send data");
1153     int32_t socketId = GetValidSocket(channelId);
1154     if (socketId <= 0) {
1155         HILOGE("no avaliable sockets, %{public}d", channelId);
1156         return NO_CONNECTED_SOCKET_ID;
1157     }
1158     int32_t ret = ERR_OK;
1159     {
1160         std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1161         auto infoIt = channelInfoMap_.find(channelId);
1162         if (infoIt == channelInfoMap_.end()) {
1163             HILOGE("no valid channel info exist");
1164             DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1165             return INVALID_CHANNEL_ID;
1166         }
1167         auto socketIt = infoIt->second.dataSenderReceivers.find(socketId);
1168         if (socketIt == infoIt->second.dataSenderReceivers.end()) {
1169             HILOGE("no valid socket");
1170             DoErrorCallback(channelId, INVALID_SOCKET_ID);
1171             return INVALID_SOCKET_ID;
1172         }
1173         auto& senderReceiver = *(socketIt->second);
1174         ret = (senderReceiver.*doSendFunc)(std::forward<Args>(args)...);
1175     }
1176     if (ret != ERR_OK) {
1177         HILOGE("failed send data, %{public}d", ret);
1178         DoErrorCallback(channelId, ret);
1179         return ret;
1180     }
1181     return ERR_OK;
1182 }
1183 
SendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1184 int32_t ChannelManager::SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data)
1185 {
1186     if (!isValidChannelId(channelId) || data == nullptr) {
1187         HILOGE("invalid channel id. %{public}d", channelId);
1188         return INVALID_CHANNEL_ID;
1189     }
1190     HILOGI("start to send bytes, %{public}u", static_cast<uint32_t>(data->Size()));
1191     auto func = [channelId, data, this]() {
1192         DoSendBytes(channelId, data);
1193     };
1194     int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1195         std::to_string(channelId));
1196     if (ret != ERR_OK) {
1197         HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1198         return ret;
1199     }
1200     HILOGI("send bytes task added to handler");
1201     return ERR_OK;
1202 }
1203 
DoSendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1204 inline int32_t ChannelManager::DoSendBytes(const int32_t channelId,
1205     const std::shared_ptr<AVTransDataBuffer>& data)
1206 {
1207     HILOGI("start to send bytes");
1208     return DoSendData(channelId, &DataSenderReceiver::SendBytesData, data);
1209 }
1210 
GetValidSocket(const int32_t channelId)1211 int32_t ChannelManager::GetValidSocket(const int32_t channelId)
1212 {
1213     std::vector<int32_t> socketIds;
1214     {
1215         std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1216         auto infoIt = channelInfoMap_.find(channelId);
1217         if (infoIt == channelInfoMap_.end() || infoIt->second.status == ChannelStatus::UNCONNECTED) {
1218             HILOGE("invalid channelId, %{public}d", channelId);
1219             return -1;
1220         }
1221         socketIds.insert(socketIds.begin(),
1222             infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
1223     }
1224 
1225     for (const auto socketId : socketIds) {
1226         if (GetSocketStatus(socketId) == ChannelStatus::CONNECTED) {
1227             return socketId;
1228         }
1229     }
1230     return -1;
1231 }
1232 
SendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1233 int32_t ChannelManager::SendStream(const int32_t channelId,
1234     const std::shared_ptr<AVTransStreamData>& data)
1235 {
1236     if (!isValidChannelId(channelId) || data == nullptr) {
1237         HILOGE("invalid channel id");
1238         DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1239         return INVALID_CHANNEL_ID;
1240     }
1241     HILOGI("start to send stream");
1242     auto func = [=]() {
1243         DoSendStream(channelId, data);
1244     };
1245     int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1246         std::to_string(channelId));
1247     if (ret != ERR_OK) {
1248         HILOGE("failed to add send stream task, ret=%{public}d", ret);
1249         return POST_TASK_FAILED;
1250     }
1251     HILOGI("send stream task added to handler");
1252     return ERR_OK;
1253 }
1254 
DoSendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1255 int32_t ChannelManager::DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1256 {
1257     HILOGI("start to send stream");
1258     return DoSendData(channelId, &DataSenderReceiver::SendStreamData, data);
1259 }
1260 
SendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1261 int32_t ChannelManager::SendMessage(const int32_t channelId,
1262     const std::shared_ptr<AVTransDataBuffer>& data)
1263 {
1264     if (!isValidChannelId(channelId) || data == nullptr) {
1265         HILOGE("invalid channel id. %{public}d", channelId);
1266         return INVALID_CHANNEL_ID;
1267     }
1268     HILOGI("start to send message, %{public}u", static_cast<uint32_t>(data->Size()));
1269     auto func = [channelId, data, this]() {
1270         DoSendMessage(channelId, data);
1271     };
1272     int32_t ret = PostMsgTask(func, AppExecFwk::EventQueue::Priority::HIGH);
1273     if (ret != ERR_OK) {
1274         HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1275         return ret;
1276     }
1277     HILOGD("send message task added to handler");
1278     return ERR_OK;
1279 }
1280 
DoSendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1281 int32_t ChannelManager::DoSendMessage(const int32_t channelId,
1282     const std::shared_ptr<AVTransDataBuffer>& data)
1283 {
1284     HILOGI("start to send message");
1285     return DoSendData(channelId, &DataSenderReceiver::SendMessageData, data);
1286 }
1287 
SendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1288 int32_t ChannelManager::SendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1289     const std::vector<std::string>& dFiles)
1290 {
1291     HILOGI("start to send files, %{public}d", channelId);
1292     if (!isValidChannelId(channelId) || sFiles.empty() || dFiles.empty()) {
1293         HILOGE("invalid channel id. %{public}d or empty sfiles", channelId);
1294         return INVALID_PARAMETERS_ERR;
1295     }
1296     if (sFiles.size() != dFiles.size() || sFiles.size() > MAX_FILE_COUNT) {
1297         HILOGE("src size:%{public}d, dst size:%{public}d illegal",
1298             static_cast<int32_t>(sFiles.size()),
1299             static_cast<int32_t>(dFiles.size()));
1300     }
1301     int32_t ret = ERR_OK;
1302     auto func = [channelId, sFiles, dFiles, this]() {
1303         DoSendFile(channelId, sFiles, dFiles);
1304     };
1305     ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW);
1306     if (ret != ERR_OK) {
1307         HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1308         return ret;
1309     }
1310     HILOGI("send files task added to handler");
1311     return ERR_OK;
1312 }
1313 
DoSendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1314 int32_t ChannelManager::DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1315     const std::vector<std::string>& dFiles)
1316 {
1317     HILOGI("start to do send files");
1318     return DoSendData(channelId, &DataSenderReceiver::SendFileData, sFiles, dFiles);
1319 }
1320 
OnBytesReceived(int32_t socketId,const void * data,const uint32_t dataLen)1321 void ChannelManager::OnBytesReceived(int32_t socketId,
1322     const void* data, const uint32_t dataLen)
1323 {
1324     int32_t channelId = 0;
1325     CHECK_SOCKET_ID(socketId);
1326     CHECK_CHANNEL_ID(socketId, channelId);
1327     CHECK_DATA_NULL(socketId, data, OnError);
1328     HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1329     std::shared_ptr<AVTransDataBuffer> packedData = ProcessRecvData(channelId, socketId, data, dataLen);
1330     if (!packedData) {
1331         return;
1332     }
1333     DoBytesReceiveCallback(channelId, packedData);
1334 }
1335 
ProcessRecvData(const int32_t channelId,const int32_t socketId,const void * data,const uint32_t dataLen)1336 std::shared_ptr<AVTransDataBuffer> ChannelManager::ProcessRecvData(const int32_t channelId,
1337     const int32_t socketId, const void* data, const uint32_t dataLen)
1338 {
1339     std::shared_lock<std::shared_mutex> readLock(channelMutex_);
1340     const uint8_t* header = static_cast<const uint8_t*>(data);
1341     auto infoIt = channelInfoMap_.find(channelId);
1342     if (infoIt == channelInfoMap_.end()) {
1343         DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1344         return nullptr;
1345     }
1346 
1347     int32_t ret = infoIt->second.dataSenderReceivers[socketId]->PackRecvPacketData(header, dataLen);
1348     if (ret != ERR_OK) {
1349         HILOGE("pack recv data failed");
1350         DoErrorCallback(channelId, ret);
1351         return nullptr;
1352     }
1353     return infoIt->second.dataSenderReceivers[socketId]->GetPacketedData();
1354 }
1355 
DoBytesReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1356 void ChannelManager::DoBytesReceiveCallback(const int32_t channelId,
1357     const std::shared_ptr<AVTransDataBuffer>& buffer)
1358 {
1359     NotifyListeners(channelId, &IChannelListener::OnBytes,
1360         AppExecFwk::EventQueue::Priority::LOW, buffer);
1361 }
1362 
OnMessageReceived(int32_t socketId,const void * data,const uint32_t dataLen)1363 void ChannelManager::OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen)
1364 {
1365     HILOGI("data len = %{public}d", dataLen);
1366     if (dataLen > MAX_LEN) {
1367         HILOGE("dataLen is too long");
1368         return;
1369     }
1370     int32_t channelId = 0;
1371     CHECK_SOCKET_ID(socketId);
1372     CHECK_CHANNEL_ID(socketId, channelId);
1373     CHECK_DATA_NULL(socketId, data, OnError);
1374     HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1375     std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(dataLen);
1376     int32_t ret = memcpy_s(buffer->Data(),
1377         buffer->Size(), data, dataLen);
1378     if (ret != ERR_OK) {
1379         HILOGE("pack recv data failed");
1380         DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1381         return;
1382     }
1383     DoMessageReceiveCallback(channelId, buffer);
1384 }
1385 
DoMessageReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1386 void ChannelManager::DoMessageReceiveCallback(const int32_t channelId,
1387     const std::shared_ptr<AVTransDataBuffer>& buffer)
1388 {
1389     NotifyListenersNew(channelId, &IChannelListener::OnMessage,
1390         AppExecFwk::EventQueue::Priority::HIGH, buffer);
1391 }
1392 
OnStreamReceived(int32_t socketId,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)1393 void ChannelManager::OnStreamReceived(int32_t socketId, const StreamData* data,
1394     const StreamData* ext, const StreamFrameInfo* param)
1395 {
1396     int32_t channelId = 0;
1397     CHECK_SOCKET_ID(socketId);
1398     CHECK_CHANNEL_ID(socketId, channelId);
1399     CHECK_DATA_NULL(socketId, data, OnError);
1400     CHECK_DATA_NULL(socketId, ext, OnError);
1401     std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(data->bufLen);
1402     int32_t ret = memcpy_s(buffer->Data(), buffer->Size(), data->buf, data->bufLen);
1403     if (ret != ERR_OK) {
1404         HILOGE("copy stream data failed, %{public}d", socketId);
1405         DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1406         return;
1407     }
1408     AVTransStreamDataExt streamDataExt;
1409     std::shared_ptr<AVTransStreamData> streamData = std::make_shared<AVTransStreamData>(buffer, streamDataExt);
1410     ret = streamData->DeserializeStreamDataExt(ext->buf);
1411     if (ret != ERR_OK) {
1412         HILOGE("deserialize stream ext failed, %{public}d", socketId);
1413         DoErrorCallback(channelId, PARSE_AV_TRANS_STREAM_EXT_FAILED);
1414         return;
1415     }
1416     DoStreamReceiveCallback(channelId, streamData);
1417 }
1418 
DoStreamReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1419 void ChannelManager::DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1420 {
1421     NotifyListeners(channelId, &IChannelListener::OnStream,
1422         AppExecFwk::EventQueue::Priority::LOW, data);
1423 }
1424 
OnFileEventReceived(int32_t socketId,FileEvent * event)1425 void ChannelManager::OnFileEventReceived(int32_t socketId, FileEvent *event)
1426 {
1427     int32_t channelId = 0;
1428     CHECK_SOCKET_ID(socketId);
1429     // update recv path before onbind
1430     if (event == nullptr) {
1431         HILOGE("socket %{public}d event empty", socketId);
1432         return;
1433     }
1434     if (event->type == FileEventType::FILE_EVENT_RECV_UPDATE_PATH) {
1435         HILOGI("start to set update path func, %{public}d", socketId);
1436         return DispatchProcessFileEvent(fileChannelId_.load(), event);
1437     }
1438     CHECK_CHANNEL_ID(socketId, channelId);
1439     CHECK_DATA_NULL(socketId, event, OnError);
1440     HILOGI("start to dispatch file event, %{public}d", channelId);
1441     DispatchProcessFileEvent(channelId, event);
1442 }
1443 
DispatchProcessFileEvent(int32_t channelId,FileEvent * event)1444 void ChannelManager::DispatchProcessFileEvent(int32_t channelId, FileEvent *event)
1445 {
1446     HILOGI("start to dispatch file event");
1447     switch (event->type) {
1448         case FileEventType::FILE_EVENT_SEND_PROCESS:
1449         case FileEventType::FILE_EVENT_SEND_FINISH: {
1450             DealFileSendEvent(channelId, event);
1451             break;
1452         }
1453         case FileEventType::FILE_EVENT_RECV_START:
1454         case FileEventType::FILE_EVENT_RECV_PROCESS:
1455         case FileEventType::FILE_EVENT_RECV_FINISH: {
1456             DealFileRecvEvent(channelId, event);
1457             break;
1458         }
1459         case FileEventType::FILE_EVENT_BUTT:
1460         case FileEventType::FILE_EVENT_SEND_ERROR:
1461         case FileEventType::FILE_EVENT_RECV_ERROR: {
1462             DealFileErrorEvent(channelId, event);
1463             break;
1464         }
1465         case FileEventType::FILE_EVENT_RECV_UPDATE_PATH: {
1466             DealFileUpdatePathEvent(channelId, event);
1467             break;
1468         }
1469         default:
1470             break;
1471     }
1472 }
1473 
DealFileSendEvent(int32_t channelId,FileEvent * event)1474 void ChannelManager::DealFileSendEvent(int32_t channelId, FileEvent *event)
1475 {
1476     HILOGI("start to deal file send event, %{public}d", channelId);
1477     FileInfo info;
1478     if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1479         info.commonInfo.eventType = ChannelFileEvent::SEND_PROCESS;
1480     } else {
1481         info.commonInfo.eventType = ChannelFileEvent::SEND_FINISH;
1482     }
1483     info.commonInfo.fileCnt = event->fileCnt;
1484     for (uint32_t i = 0; i < event->fileCnt; ++i) {
1485         info.commonInfo.fileList.push_back(std::string(event->files[i]));
1486     }
1487 
1488     FileSendInfo sendInfo;
1489     sendInfo.bytesProcessed = event->bytesProcessed;
1490     sendInfo.bytesTotal = event->bytesTotal;
1491     if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1492         sendInfo.rate = event->rate;
1493     }
1494     info.sendInfo = sendInfo;
1495     DoFileSendCallback(channelId, info);
1496     HILOGI("end");
1497 }
1498 
DealFileRecvEvent(int32_t channelId,FileEvent * event)1499 void ChannelManager::DealFileRecvEvent(int32_t channelId, FileEvent *event)
1500 {
1501     HILOGI("start to deal file recv event, %{public}d", channelId);
1502     FileInfo info;
1503     if (event->type == FileEventType::FILE_EVENT_RECV_START) {
1504         info.commonInfo.eventType = ChannelFileEvent::RECV_START;
1505     } else if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1506         info.commonInfo.eventType = ChannelFileEvent::RECV_PROCESS;
1507     } else {
1508         info.commonInfo.eventType = ChannelFileEvent::RECV_FINISH;
1509     }
1510     info.commonInfo.fileCnt = event->fileCnt;
1511     for (uint32_t i = 0; i < event->fileCnt; ++i) {
1512         info.commonInfo.fileList.push_back(std::string(event->files[i]));
1513     }
1514 
1515     FileRecvInfo recvInfo;
1516     recvInfo.bytesProcessed = event->bytesProcessed;
1517     recvInfo.bytesTotal = event->bytesTotal;
1518     if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1519         recvInfo.rate = event->rate;
1520     }
1521     info.recvInfo = recvInfo;
1522     DoFileRecvCallback(channelId, info);
1523     HILOGI("end");
1524 }
1525 
DealFileErrorEvent(int32_t channelId,FileEvent * event)1526 void ChannelManager::DealFileErrorEvent(int32_t channelId, FileEvent *event)
1527 {
1528     HILOGI("start to deal file error event, %{public}d", channelId);
1529     FileInfo info;
1530     if (event->type == FileEventType::FILE_EVENT_SEND_ERROR) {
1531         info.commonInfo.eventType = ChannelFileEvent::SEND_ERROR;
1532     } else {
1533         info.commonInfo.eventType = ChannelFileEvent::RECV_ERROR;
1534     }
1535     info.commonInfo.fileCnt = event->fileCnt;
1536     for (uint32_t i = 0; i < event->fileCnt; ++i) {
1537         info.commonInfo.fileList.push_back(std::string(event->files[i]));
1538     }
1539 
1540     FileErrorInfo errorInfo;
1541     errorInfo.errorCode = event->errorCode;
1542     info.errorInfo = errorInfo;
1543     if (info.commonInfo.eventType == ChannelFileEvent::RECV_ERROR) {
1544         DoFileRecvCallback(channelId, info);
1545     } else {
1546         DoFileSendCallback(channelId, info);
1547     }
1548     HILOGI("end");
1549 }
1550 
DealFileUpdatePathEvent(int32_t channelId,FileEvent * event)1551 void ChannelManager::DealFileUpdatePathEvent(int32_t channelId, FileEvent *event)
1552 {
1553     HILOGI("start to deal file update path event, %{public}d", channelId);
1554     event->UpdateRecvPath = GetRecvPath;
1555     HILOGI("end");
1556 }
1557 
GetRecvPathFromUser()1558 const char* ChannelManager::GetRecvPathFromUser()
1559 {
1560     HILOGI("get recv path from user");
1561     int32_t channelId = static_cast<int32_t>(fileChannelId_.load());
1562     std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1563     auto it = listenersMap_.find(channelId);
1564     if (it == listenersMap_.end() || it->second.empty()) {
1565         HILOGE("no matching listener to %{public}d", channelId);
1566         return nullptr;
1567     }
1568     auto& listeners = it->second;
1569     for (const auto& listener : listeners) {
1570         if (auto ptr = listener.lock()) {
1571             return ptr->GetRecvPath(channelId);
1572         }
1573     }
1574     return nullptr;
1575 }
1576 
DoFileRecvCallback(const int32_t channelId,const FileInfo & info)1577 void ChannelManager::DoFileRecvCallback(const int32_t channelId, const FileInfo& info)
1578 {
1579     NotifyListeners(channelId, &IChannelListener::OnRecvFile,
1580         AppExecFwk::EventQueue::Priority::HIGH, info);
1581 }
1582 
DoFileSendCallback(const int32_t channelId,const FileInfo & info)1583 void ChannelManager::DoFileSendCallback(const int32_t channelId, const FileInfo& info)
1584 {
1585     NotifyListeners(channelId, &IChannelListener::OnSendFile,
1586         AppExecFwk::EventQueue::Priority::HIGH, info);
1587 }
1588 }
1589 }
1590