• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #include "channel_manager.h"
17 #include "dtbcollabmgr_log.h"
18 #include "softbus_file_adapter.h"
19 #include <algorithm>
20 #include <chrono>
21 #include <future>
22 #include <sys/prctl.h>
23 #include "securec.h"
24 #include <unordered_set>
25 
26 namespace OHOS {
27 namespace DistributedCollab {
28 IMPLEMENT_SINGLE_INSTANCE(ChannelManager);
29 namespace {
30     static const std::string TAG = "DSchedCollabChannelManager";
31 
32     enum class QosSpeedType {
33         HIGH,
34         LOW
35     };
36 
37     static std::map<ChannelDataType, TransDataType> CHANNEL_SOFTBUS_DATATYPE_MAP = {
38         { ChannelDataType::MESSAGE, DATA_TYPE_MESSAGE },
39         { ChannelDataType::BYTES, DATA_TYPE_BYTES },
40         { ChannelDataType::VIDEO_STREAM, DATA_TYPE_VIDEO_STREAM },
41         { ChannelDataType::FILE, DATA_TYPE_FILE }
42     };
43 
44     static std::map<ChannelDataType, QosSpeedType> CHANNEL_DATATYPE_SPEED_MAP = {
45         { ChannelDataType::MESSAGE, QosSpeedType::LOW },
46         { ChannelDataType::BYTES, QosSpeedType::LOW },
47         { ChannelDataType::VIDEO_STREAM, QosSpeedType::HIGH },
48         { ChannelDataType::FILE, QosSpeedType::HIGH }
49     };
50 
51     static const std::string SPLIT_FLAG = "_";
52     static const std::string COLLAB_PGK_NAME = "dms";
53     static const std::string SESSION_NAME_PREFIX = "ohos.dtbcollab.dms";
54 
55     static std::map<ChannelDataType, std::string> CHANNEL_DATATYPE_PREFIX_MAP = {
56         { ChannelDataType::MESSAGE, "M" },
57         { ChannelDataType::BYTES, "B" },
58         { ChannelDataType::VIDEO_STREAM, "V" },
59         { ChannelDataType::FILE, "F" }
60     };
61 
62     static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW = 4 * 1024 * 1024;
63     static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY = 10000;
64     static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY = 2000;
65 
66     static QosTV g_low_qosInfo[] = {
67         { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW },
68         { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY },
69         { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY },
70         { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
71     };
72 
73     static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW = 4 * 1024 * 1024;
74     static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY = 10000;
75     static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY = 2000;
76 
77     static QosTV g_high_qosInfo[] = {
78         { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW },
79         { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY },
80         { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY },
81         { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
82     };
83 
84     static std::map<QosSpeedType, QosTV*> qos_config = {
85         { QosSpeedType::HIGH, g_high_qosInfo },
86         { QosSpeedType::LOW, g_low_qosInfo }
87     };
88 
89     static uint32_t g_lowQosTvParamIndex = static_cast<uint32_t>(sizeof(g_low_qosInfo) / sizeof(QosTV));
90     static uint32_t g_highQosTvParamIndex = static_cast<uint32_t>(sizeof(g_high_qosInfo) / sizeof(QosTV));
91     static std::map<QosSpeedType, uint32_t> qos_speed_config = {
92         { QosSpeedType::HIGH, g_highQosTvParamIndex },
93         { QosSpeedType::LOW, g_lowQosTvParamIndex }
94     };
95 
96 #define CHECK_SOCKET_ID(socketId)                          \
97 do {                                                       \
98     if ((socketId) <= 0) {                                 \
99         HILOGE("invalid socket id, %{public}d", socketId); \
100         return;                                            \
101     }                                                      \
102 } while (0)
103 
104 #define CHECK_CHANNEL_ID(socketId, channelId)                                 \
105 do {                                                                          \
106     (channelId) = GetChannelId(socketId);                                     \
107     if (!isValidChannelId(channelId)) {                                       \
108         HILOGE("invalid socket id %{public}d, can't find channel", socketId); \
109         return;                                                               \
110     }                                                                         \
111 } while (0)
112 
113 #define CHECK_DATA_NULL(socketId, data, errorHandler)                      \
114 do {                                                                       \
115     if ((data) == nullptr) {                                               \
116         HILOGE("receive empty bytes data, socketId=%{public}d", socketId); \
117         (errorHandler)(socketId, RECV_DATA_EMPTY);                        \
118         return;                                                            \
119     }                                                                      \
120 } while (0)
121 }
122 
OnSocketConnected(int32_t socket,PeerSocketInfo info)123 static void OnSocketConnected(int32_t socket, PeerSocketInfo info)
124 {
125     ChannelManager::GetInstance().OnSocketConnected(socket, info);
126 }
127 
OnSocketClosed(int32_t socket,ShutdownReason reason)128 static void OnSocketClosed(int32_t socket, ShutdownReason reason)
129 {
130     ChannelManager::GetInstance().OnSocketClosed(socket, reason);
131 }
132 
OnBytesRecv(int32_t socket,const void * data,uint32_t dataLen)133 static void OnBytesRecv(int32_t socket, const void* data, uint32_t dataLen)
134 {
135     ChannelManager::GetInstance().OnBytesReceived(socket, data, dataLen);
136 }
137 
OnMessageRecv(int32_t socket,const void * data,uint32_t dataLen)138 static void OnMessageRecv(int32_t socket, const void* data, uint32_t dataLen)
139 {
140     ChannelManager::GetInstance().OnMessageReceived(socket, data, dataLen);
141 }
142 
OnStreamRecv(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)143 static void OnStreamRecv(int32_t socket, const StreamData* data, const StreamData* ext,
144     const StreamFrameInfo* param)
145 {
146     ChannelManager::GetInstance().OnStreamReceived(socket, data, ext, param);
147 }
148 
OnError(int32_t socket,int32_t errCode)149 static void OnError(int32_t socket, int32_t errCode)
150 {
151     ChannelManager::GetInstance().OnSocketError(socket, errCode);
152 }
153 
OnFileEvent(int32_t socket,FileEvent * event)154 static void OnFileEvent(int32_t socket, FileEvent *event)
155 {
156     ChannelManager::GetInstance().OnFileEventReceived(socket, event);
157 }
158 
GetRecvPath()159 static const char* GetRecvPath()
160 {
161     return ChannelManager::GetInstance().GetRecvPathFromUser();
162 }
163 
164 ISocketListener channelManagerListener = {
165     .OnBind = OnSocketConnected,
166     .OnShutdown = OnSocketClosed,
167     .OnBytes = OnBytesRecv,
168     .OnMessage = OnMessageRecv,
169     .OnStream = OnStreamRecv,
170     .OnFile = OnFileEvent,
171     .OnError = OnError,
172 };
173 
~ChannelManager()174 ChannelManager::~ChannelManager()
175 {
176     DeInit();
177 };
178 
Init(const std::string & ownerName)179 int32_t ChannelManager::Init(const std::string& ownerName)
180 {
181     HILOGI("start init channel manager");
182     if (eventHandler_ != nullptr && callbackEventHandler_ != nullptr && msgEventHandler_ != nullptr) {
183         HILOGW("server channel already init");
184         return ERR_OK;
185     }
186     if (serverSocketId_ > 0) {
187         HILOGW("server socket already init");
188         return ERR_OK;
189     }
190     ownerName_ = ownerName;
191 
192     eventThread_ = std::thread(&ChannelManager::StartEvent, this);
193     std::unique_lock<std::mutex> lock(eventMutex_);
194     eventCon_.wait(lock, [this] {
195         return eventHandler_ != nullptr;
196     });
197 
198     callbackEventThread_ = std::thread(&ChannelManager::StartCallbackEvent, this);
199     std::unique_lock<std::mutex> callbackLock(callbackEventMutex_);
200     callbackEventCon_.wait(callbackLock, [this] {
201         return callbackEventHandler_ != nullptr;
202     });
203 
204     msgEventThread_ = std::thread(&ChannelManager::StartMsgEvent, this);
205     std::unique_lock<std::mutex> msgLock(msgEventMutex_);
206     msgEventCon_.wait(msgLock, [this] {
207         return msgEventHandler_ != nullptr;
208     });
209 
210     int32_t socketServerId = CreateServerSocket();
211     if (socketServerId <= 0) {
212         HILOGE("create socket failed, ret: %{public}d", socketServerId);
213         return CREATE_SOCKET_FAILED;
214     }
215     int32_t ret = Listen(socketServerId, g_low_qosInfo,
216         g_lowQosTvParamIndex, &channelManagerListener);
217     if (ret != ERR_OK) {
218         HILOGE("service listen failed, ret: %{public}d", ret);
219         return LISTEN_SOCKET_FAILED;
220     }
221     serverSocketId_ = socketServerId;
222     HILOGI("end");
223     return ERR_OK;
224 }
225 
StartEvent()226 void ChannelManager::StartEvent()
227 {
228     HILOGI("StartEvent start");
229     prctl(PR_SET_NAME, ownerName_.c_str());
230     auto runner = AppExecFwk::EventRunner::Create(false);
231     {
232         std::lock_guard<std::mutex> lock(eventMutex_);
233         eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
234     }
235     eventCon_.notify_one();
236     runner->Run();
237     HILOGI("StartEvent end");
238 }
239 
StartCallbackEvent()240 void ChannelManager::StartCallbackEvent()
241 {
242     HILOGI("Start callback event start");
243     std::string callbackName = ownerName_ + "callback";
244     prctl(PR_SET_NAME, callbackName.c_str());
245     auto runner = AppExecFwk::EventRunner::Create(false);
246     {
247         std::lock_guard<std::mutex> lock(callbackEventMutex_);
248         callbackEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
249     }
250     callbackEventCon_.notify_one();
251     runner->Run();
252     HILOGI("callback event end");
253 }
254 
StartMsgEvent()255 void ChannelManager::StartMsgEvent()
256 {
257     HILOGI("Start msg event start");
258     std::string msgName = ownerName_ + "msg";
259     prctl(PR_SET_NAME, msgName.c_str());
260     auto runner = AppExecFwk::EventRunner::Create(false);
261     {
262         std::lock_guard<std::mutex> lock(msgEventMutex_);
263         msgEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
264     }
265     msgEventCon_.notify_one();
266     runner->Run();
267     HILOGI("msg event end");
268 }
269 
PostTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority,const std::string & name)270 int32_t ChannelManager::PostTask(const AppExecFwk::InnerEvent::Callback& callback,
271     const AppExecFwk::EventQueue::Priority priority, const std::string& name)
272 {
273     if (eventHandler_ == nullptr) {
274         HILOGE("event handler empty");
275         return NULL_EVENT_HANDLER;
276     }
277     if (eventHandler_->PostTask(callback, name, 0, priority)) {
278         return ERR_OK;
279     }
280     HILOGE("add task failed");
281     return POST_TASK_FAILED;
282 }
283 
PostCallbackTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)284 int32_t ChannelManager::PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback,
285     const AppExecFwk::EventQueue::Priority priority)
286 {
287     if (callbackEventHandler_ == nullptr) {
288         HILOGE("callback event handler empty");
289         return NULL_EVENT_HANDLER;
290     }
291     if (callbackEventHandler_->PostTask(callback, priority)) {
292         return ERR_OK;
293     }
294     HILOGE("add callback task failed");
295     return POST_TASK_FAILED;
296 }
297 
PostMsgTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)298 int32_t ChannelManager::PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback,
299     const AppExecFwk::EventQueue::Priority priority)
300 {
301     if (msgEventHandler_ == nullptr) {
302         HILOGE("msg event handler empty");
303         return NULL_EVENT_HANDLER;
304     }
305     if (msgEventHandler_->PostTask(callback, priority)) {
306         return ERR_OK;
307     }
308     HILOGE("add msg task failed");
309     return POST_TASK_FAILED;
310 }
311 
CreateServerSocket()312 int32_t ChannelManager::CreateServerSocket()
313 {
314     HILOGI("start create server socket");
315     std::string sessionName = SESSION_NAME_PREFIX + ownerName_;
316     HILOGI("sessionName: %{public}s, size: %{public}zu", sessionName.c_str(),
317         sessionName.length());
318     SocketInfo info = {
319         .name = const_cast<char*>(sessionName.c_str()),
320         .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
321     };
322     int32_t socket = Socket(info);
323     HILOGI("finish, socket id: %{public}d", socket);
324     return socket;
325 }
326 
DeInit()327 void ChannelManager::DeInit()
328 {
329     HILOGI("start deinit channel manager");
330     // stop all task
331     if (eventHandler_ != nullptr) {
332         eventHandler_->GetEventRunner()->Stop();
333         if (eventThread_.joinable()) {
334             eventThread_.join();
335         }
336         eventHandler_ = nullptr;
337     } else {
338         HILOGE("eventHandler_ is nullptr");
339     }
340 
341     // stop callback task
342     if (callbackEventHandler_ != nullptr) {
343         callbackEventHandler_->GetEventRunner()->Stop();
344         if (callbackEventThread_.joinable()) {
345             callbackEventThread_.join();
346         }
347         callbackEventHandler_ = nullptr;
348     } else {
349         HILOGE("callbackEventHandler_ is nullptr");
350     }
351 
352     // stop msg task
353     if (msgEventHandler_ != nullptr) {
354         msgEventHandler_->GetEventRunner()->Stop();
355         if (msgEventThread_.joinable()) {
356             msgEventThread_.join();
357         }
358         msgEventHandler_ = nullptr;
359     } else {
360         HILOGE("msgEventHandler_ is nullptr");
361     }
362 
363     // release channels
364     std::unordered_set<int32_t> channelIds;
365     for (const auto& entry : channelIdMap_) {
366         for (int32_t id : entry.second) {
367             channelIds.insert(id);
368         }
369     }
370     for (const int32_t id : channelIds) {
371         DeleteChannel(id);
372     }
373     Shutdown(serverSocketId_);
374     Reset();
375     HILOGI("end");
376 }
377 
Reset()378 void ChannelManager::Reset()
379 {
380     HILOGI("reset channel manager");
381     serverSocketId_ = -1;
382     ownerName_ = "";
383     nextIds_ = {
384         { ChannelDataType::MESSAGE, MESSAGE_START_ID },
385         { ChannelDataType::BYTES, BYTES_START_ID },
386         { ChannelDataType::VIDEO_STREAM, STREAM_START_ID },
387         { ChannelDataType::FILE, FILE_START_ID }
388     };
389 }
390 
GetVersion()391 int32_t ChannelManager::GetVersion()
392 {
393     return VERSION_;
394 }
395 
CreateServerChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)396 int32_t ChannelManager::CreateServerChannel(const std::string& channelName,
397     const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
398 {
399     HILOGI("start to creat server channel waiting for connect");
400     std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
401     if (!info) {
402         HILOGE("Create server channel failed");
403         return CREATE_SERVER_CHANNEL_FAILED;
404     }
405     std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
406     channelIdMap_[channelName].push_back(info->channelId);
407     channelInfoMap_.emplace(info->channelId, std::move(*info));
408     // save file channel
409     fileChannelId_.store(info->channelId);
410     HILOGI("end");
411     return info->channelId;
412 }
413 
CreateClientChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)414 int32_t ChannelManager::CreateClientChannel(const std::string& channelName,
415     const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
416 {
417     HILOGI("start to creat client channel to connect other");
418     std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
419     if (!info) {
420         HILOGE("Create client channel failed");
421         return CREATE_CLIENT_CHANNEL_FAILED;
422     }
423     int32_t ret = RegisterSocket(*info, dataType);
424     HILOGI("end");
425     return ret == ERR_OK ? info->channelId : ret;
426 };
427 
CreateBaseChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)428 std::optional<ChannelInfo> ChannelManager::CreateBaseChannel(const std::string& channelName,
429     const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
430 {
431     HILOGI("start create base channel, dataType=%{public}d, name=%{public}s",
432         static_cast<int32_t>(dataType), channelName.c_str());
433     int32_t channelId = GenerateNextId(dataType);
434     if (!isValidChannelId(channelId)) {
435         HILOGE("Get channel id failed, id=%{public}d", channelId);
436         return std::nullopt;
437     }
438     ChannelInfo info;
439     info.channelId = channelId;
440     info.channelName = channelName;
441     info.status = ChannelStatus::UNCONNECTED;
442     info.dataType = dataType;
443     info.peerInfo = peerInfo;
444     return info;
445 }
446 
GenerateNextId(const ChannelDataType dataType)447 int32_t ChannelManager::GenerateNextId(const ChannelDataType dataType)
448 {
449     int32_t channelId = 0;
450     // lock for each type
451     std::lock_guard<std::mutex> typeLock(typeMutex_[dataType]);
452     HILOGI("create socket for %{public}d", static_cast<int32_t>(dataType));
453     channelId = nextIds_[dataType];
454     if (channelId - CHANNEL_ID_GAP * (static_cast<int32_t>(dataType) + 1)
455         >= CHANNEL_ID_GAP) {
456         HILOGE("type %{public}d exceed max channel",
457             static_cast<int32_t>(dataType));
458         return CHANNEL_NUM_EXCEED_LIMIT;
459     }
460     nextIds_[dataType]++;
461     return channelId;
462 }
463 
RegisterSocket(ChannelInfo & info,const ChannelDataType dataType)464 int32_t ChannelManager::RegisterSocket(ChannelInfo& info, const ChannelDataType dataType)
465 {
466     int32_t clientSocketId = CreateClientSocket(info.channelName,
467         info.peerInfo.peerName, info.peerInfo.networkId, dataType);
468     if (clientSocketId <= 0) {
469         HILOGE("create socket failed, ret: %{public}d", clientSocketId);
470         return CREATE_SOCKET_FAILED;
471     }
472     // save info to each map
473     {
474         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
475         socketChannelMap_.emplace(clientSocketId, info.channelId);
476         socketStatusMap_.emplace(clientSocketId, ChannelStatus::UNCONNECTED);
477     }
478     HILOGI("register channel name: %{public}s", info.channelName.c_str());
479     {
480         std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
481         info.clientSockets.push_back(clientSocketId);
482         info.dataSenderReceivers[clientSocketId] = std::make_unique<DataSenderReceiver>(clientSocketId);
483         channelIdMap_[info.channelName].push_back(info.channelId);
484         channelInfoMap_.emplace(info.channelId, std::move(info));
485     }
486     return ERR_OK;
487 }
488 
CreateClientSocket(const std::string & channelName,const std::string & peerName,const std::string & peerNetworkId,const ChannelDataType dataType)489 int32_t ChannelManager::CreateClientSocket(const std::string& channelName,
490     const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType)
491 {
492     HILOGI("start");
493     if (channelName.length() > MAX_CHANNEL_NAME_LENGTH) {
494         HILOGE("channel name too long, %{public}s", channelName.c_str());
495         return -INVALID_CHANNEL_NAME;
496     }
497     // ohos.dtbcollab.dms64_F_64
498     std::string name = SESSION_NAME_PREFIX + ownerName_ +
499         SPLIT_FLAG + CHANNEL_DATATYPE_PREFIX_MAP[dataType] + SPLIT_FLAG + channelName;
500     std::string peerSocketName = SESSION_NAME_PREFIX + peerName;
501     HILOGI("self-name: %{public}s, peerName: %{public}s", name.c_str(), peerSocketName.c_str());
502     SocketInfo socketInfo = {
503         .name = const_cast<char*>(name.c_str()),
504         .peerName = const_cast<char*>(peerSocketName.c_str()),
505         .peerNetworkId = const_cast<char*>(peerNetworkId.c_str()),
506         .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
507         .dataType = CHANNEL_SOFTBUS_DATATYPE_MAP[dataType]
508     };
509     int32_t sessionId = Socket(socketInfo);
510     HILOGI("finish, socket session id: %{public}d", sessionId);
511     return sessionId;
512 }
513 
isValidChannelId(const int32_t channelId)514 bool ChannelManager::isValidChannelId(const int32_t channelId)
515 {
516     return channelId > CHANNEL_ID_GAP && channelId <= (FILE_START_ID + CHANNEL_ID_GAP);
517 }
518 
DeleteChannel(const int32_t channelId)519 int32_t ChannelManager::DeleteChannel(const int32_t channelId)
520 {
521     HILOGI("start delete channel");
522     if (!isValidChannelId(channelId)) {
523         HILOGE("invalid channel id");
524         return INVALID_CHANNEL_ID;
525     }
526     ClearRegisterListener(channelId);
527     ClearSendTask(channelId);
528     ClearRegisterChannel(channelId);
529     ClearRegisterSocket(channelId);
530     HILOGI("end delete channel");
531     return channelId;
532 }
533 
ClearRegisterChannel(const int32_t channelId)534 void ChannelManager::ClearRegisterChannel(const int32_t channelId)
535 {
536     HILOGI("start clear channel info, channelId=%{public}d", channelId);
537     std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
538     std::string channelName;
539     auto infoIt = channelInfoMap_.find(channelId);
540     if (infoIt != channelInfoMap_.end()) {
541         channelName = infoIt->second.channelName;
542     }
543     channelInfoMap_.erase(channelId);
544 
545     auto idIt = channelIdMap_.find(channelName);
546     if (idIt != channelIdMap_.end()) {
547         idIt->second.erase(std::remove(idIt->second.begin(), idIt->second.end(), channelId), idIt->second.end());
548     }
549 }
550 
ClearRegisterListener(const int32_t channelId)551 void ChannelManager::ClearRegisterListener(const int32_t channelId)
552 {
553     HILOGI("start release listener, channelId=%{public}d", channelId);
554     std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
555     listenersMap_.erase(channelId);
556 }
557 
ClearRegisterSocket(const int32_t channelId)558 void ChannelManager::ClearRegisterSocket(const int32_t channelId)
559 {
560     HILOGI("start release socket, channelId=%{public}d", channelId);
561     std::vector<int32_t> socketIds;
562     {
563         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
564         if (!socketChannelMap_.empty()) {
565             for (auto&& socket : socketChannelMap_) {
566                 if (socket.second == channelId) {
567                     socketIds.push_back(socket.first);
568                 }
569             }
570         }
571         for (const auto socketId : socketIds) {
572             HILOGI("start release socket, %{public}d", socketId);
573             socketChannelMap_.erase(socketId);
574             socketStatusMap_.erase(socketId);
575         }
576     }
577     HILOGI("start to shutdown socket");
578     for (const auto socketId : socketIds) {
579         Shutdown(socketId);
580     }
581 }
582 
ClearSendTask(int32_t channelId)583 void ChannelManager::ClearSendTask(int32_t channelId)
584 {
585     HILOGI("clear send task for=%{public}d", channelId);
586     if (eventHandler_ != nullptr) {
587         eventHandler_->RemoveTask(std::to_string(channelId));
588     }
589 }
590 
RegisterChannelListener(const int32_t channelId,const std::shared_ptr<IChannelListener> listener)591 int32_t ChannelManager::RegisterChannelListener(const int32_t channelId,
592     const std::shared_ptr<IChannelListener> listener)
593 {
594     HILOGI("start register listener, channelId=%{public}d", channelId);
595     if (listener == nullptr) {
596         HILOGE("listener empty");
597         return INVALID_LISTENER;
598     }
599     std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
600     auto listenIt = listenersMap_.find(channelId);
601     if (listenIt == listenersMap_.end()) {
602         listenersMap_[channelId].emplace_back(listener);
603         return ERR_OK;
604     }
605     CleanInvalidListener(listenIt->second);
606     auto it = std::find_if(listenIt->second.begin(), listenIt->second.end(),
607         [&listener](const std::weak_ptr<IChannelListener> weakListener) {
608             if (auto ptr = weakListener.lock()) {
609                 return listener == ptr;
610             }
611             return false;
612         });
613     if (it != listenIt->second.end()) {
614         HILOGI("already exist listener");
615         return ERR_OK;
616     }
617     listenIt->second.emplace_back(listener);
618     return ERR_OK;
619 }
620 
CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>> & listeners)621 inline void ChannelManager::CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners)
622 {
623     listeners.erase(std::remove_if(listeners.begin(), listeners.end(),
624         [](const std::weak_ptr<IChannelListener> listener) {
625             return listener.expired();
626         }),
627         listeners.end());
628 }
629 
ConnectChannel(const int32_t channelId)630 int32_t ChannelManager::ConnectChannel(const int32_t channelId)
631 {
632     HILOGI("start to connect channel %{public}d, only allow client", channelId);
633     std::vector<int32_t> socketIds;
634     ChannelDataType dataType;
635     {
636         std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
637         auto infoIt = channelInfoMap_.find(channelId);
638         if (infoIt == channelInfoMap_.end() || infoIt->second.clientSockets.empty()) {
639             HILOGE("invalid channel id");
640             return INVALID_CHANNEL_ID;
641         }
642         dataType = infoIt->second.dataType;
643         socketIds.insert(socketIds.begin(), infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
644     }
645     HILOGI("end");
646     return DoBindSockets(socketIds, dataType);
647 }
648 
DoBindSockets(const std::vector<int32_t> & socketIds,const ChannelDataType dataType)649 int32_t ChannelManager::DoBindSockets(const std::vector<int32_t>& socketIds,
650     const ChannelDataType dataType)
651 {
652     HILOGI("start to connect sockets");
653     std::vector<std::future<int32_t>> bindTasks;
654     for (const auto& socketId : socketIds) {
655         if (GetSocketStatus(socketId) == ChannelStatus::UNCONNECTED) {
656             bindTasks.emplace_back(std::async(std::launch::async, [this, socketId, dataType]() {
657                 return BindSocket(socketId, dataType);
658             }));
659         }
660     }
661     if (bindTasks.empty()) {
662         return ERR_OK;
663     }
664     for (auto&& task : bindTasks) {
665         int32_t ret = task.get();
666         HILOGI("bind task ret=%{public}d", ret);
667         if (ret == ERR_OK) {
668             return ERR_OK;
669         }
670     }
671 
672     return CONNECT_CHANNEL_FAILED;
673 }
674 
GetSocketStatus(const int32_t socketId)675 ChannelStatus ChannelManager::GetSocketStatus(const int32_t socketId)
676 {
677     std::shared_lock<std::shared_mutex> readLock(socketMutex_);
678     auto it = socketStatusMap_.find(socketId);
679     if (it != socketStatusMap_.end()) {
680         return it->second;
681     }
682     return ChannelStatus::CONNECTED;
683 }
684 
BindSocket(const int32_t socketId,const ChannelDataType dataType)685 int32_t ChannelManager::BindSocket(const int32_t socketId, const ChannelDataType dataType)
686 {
687     QosSpeedType speedType = CHANNEL_DATATYPE_SPEED_MAP[dataType];
688     const QosTV* qos = qos_config[speedType];
689     const uint32_t qosCount = qos_speed_config[speedType];
690     HILOGI("start to bind socket, id:%{public}d, speed:%{public}d", socketId, speedType);
691     int32_t ret = Bind(socketId, qos, qosCount, &channelManagerListener);
692     HILOGI("bind end");
693     if (ret != ERR_OK) {
694         HILOGE("client bind failed, ret: %{public}d", ret);
695         return BIND_SOCKET_FAILED;
696     }
697     if (dataType == ChannelDataType::FILE) {
698         ret = SoftbusFileAdpater::GetInstance().SetFileSchema(socketId);
699     }
700     if (ret != ERR_OK) {
701         HILOGE("register %{public}d file schema failed", socketId);
702         return REGISTER_FILE_SCHEMA_FAILED;
703     }
704     SetSocketStatus(socketId, ChannelStatus::CONNECTED);
705     return ret;
706 }
707 
SetSocketStatus(const int32_t socketId,const ChannelStatus status)708 int32_t ChannelManager::SetSocketStatus(const int32_t socketId, const ChannelStatus status)
709 {
710     HILOGI("start set socket id:%{public}d status %{public}d", socketId, static_cast<int32_t>(status));
711     int32_t channelId = 0;
712     {
713         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
714         auto it = socketStatusMap_.find(socketId);
715         if (it == socketStatusMap_.end()) {
716             HILOGE("no valid socket in socketStatusMap");
717             return INVALID_SOCKET_ID;
718         }
719         it->second = status;
720         auto channelIt = socketChannelMap_.find(socketId);
721         if (channelIt == socketChannelMap_.end()) {
722             HILOGE("no valid socket in socketChannelMap");
723             return INVALID_SOCKET_ID;
724         }
725         channelId = channelIt->second;
726     }
727     auto func = [channelId, this]() {
728         UpdateChannelStatus(channelId);
729     };
730     return PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
731 }
732 
UpdateChannelStatus(const int32_t channelId)733 int32_t ChannelManager::UpdateChannelStatus(const int32_t channelId)
734 {
735     HILOGI("update channel id=%{public}d", channelId);
736     std::vector<int32_t> socketIds;
737     ChannelStatus curStatus = ChannelStatus::UNCONNECTED;
738     {
739         std::shared_lock<std::shared_mutex> readLock(channelMutex_);
740         auto infoIt = channelInfoMap_.find(channelId);
741         if (infoIt == channelInfoMap_.end()) {
742             HILOGE("no valid channelInfo for %{public}d", channelId);
743             return INVALID_CHANNEL_ID;
744         }
745         for (const auto socket : infoIt->second.clientSockets) {
746             socketIds.push_back(socket);
747         }
748         curStatus = infoIt->second.status;
749     }
750 
751     ChannelStatus newStatus = ChannelStatus::UNCONNECTED;
752     for (const auto id : socketIds) {
753         if (GetSocketStatus(id) == ChannelStatus::CONNECTED) {
754             newStatus = ChannelStatus::CONNECTED;
755             break;
756         }
757     }
758     HILOGI("curStatus:%{public}d, newStatus:%{public}d",
759         static_cast<int32_t>(curStatus), static_cast<int32_t>(newStatus));
760     if (newStatus != curStatus) {
761         return SetChannelStatus(channelId, newStatus);
762     }
763     return ERR_OK;
764 }
765 
SetChannelStatus(const int32_t channelId,const ChannelStatus status)766 int32_t ChannelManager::SetChannelStatus(const int32_t channelId, const ChannelStatus status)
767 {
768     HILOGI("set channel:%{public}d, status:%{public}d", channelId, static_cast<int32_t>(status));
769     std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
770     auto infoIt = channelInfoMap_.find(channelId);
771     if (infoIt == channelInfoMap_.end()) {
772         HILOGE("no valid channelInfo for %{public}d", channelId);
773         return INVALID_CHANNEL_ID;
774     }
775     infoIt->second.status = status;
776     return ERR_OK;
777 }
778 
OnSocketConnected(int32_t socketId,const PeerSocketInfo & info)779 void ChannelManager::OnSocketConnected(int32_t socketId, const PeerSocketInfo& info)
780 {
781     if (socketId <= 0) {
782         HILOGE("invalid socketId: %{public}d", socketId);
783         return;
784     }
785     HILOGI("socket %{public}d binded", socketId);
786     std::optional<std::string> channelNameOpt = GetChannelNameFromSocket(info.name);
787     if (!channelNameOpt) {
788         HILOGE("error socket name, %{public}s", info.name);
789         return;
790     }
791     auto& channelName = *channelNameOpt;
792     std::optional<ChannelDataType> channelType = GetChannelDataTypeFromName(channelName);
793     if (!channelType) {
794         HILOGE("error channel name, %{public}s", channelName.c_str());
795         return;
796     }
797     // remove datatype flag
798     constexpr int32_t namePrefix = 2;
799     channelName = channelName.substr(namePrefix);
800     int32_t channelId = GetChannelId(channelName, *channelType);
801     if (!isValidChannelId(channelId)) {
802         HILOGE("invalid channelid=%{public}d with channelName %{public}s", channelId, channelName.c_str());
803         return;
804     }
805     auto func = [socketId, channelId, this]() {
806         UpdateChannel(socketId, channelId);
807     };
808     PostTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
809     HILOGI("add update channel task into handler");
810 }
811 
UpdateChannel(const int32_t socketId,const int32_t channelId)812 int32_t ChannelManager::UpdateChannel(const int32_t socketId, const int32_t channelId)
813 {
814     int32_t ret = RegisterSocket(socketId, channelId);
815     if (ret != ERR_OK) {
816         HILOGE("failed to save binded socket to matching channel");
817         DoErrorCallback(channelId, ret);
818         return ret;
819     }
820     ret = SetSocketStatus(socketId, ChannelStatus::CONNECTED);
821     if (ret != ERR_OK) {
822         HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
823         DoErrorCallback(channelId, ret);
824         return ret;
825     }
826     DoConnectCallback(channelId);
827     return ret;
828 }
829 
GetChannelNameFromSocket(const std::string & socketName)830 std::optional<std::string> ChannelManager::GetChannelNameFromSocket(const std::string& socketName)
831 {
832     size_t splitPos = socketName.find(SPLIT_FLAG, CHANNEL_NAME_PREFIX_LENGTH);
833     if (splitPos == std::string::npos) {
834         HILOGE("peer socket name invalid");
835         return std::nullopt;
836     }
837     return socketName.substr(splitPos + SPLIT_FLAG.length());
838 }
839 
GetChannelDataTypeFromName(const std::string & channelName)840 std::optional<ChannelDataType> ChannelManager::GetChannelDataTypeFromName(const std::string& channelName)
841 {
842     std::string prefix = channelName.substr(0, 1);
843     for (auto&& dataType : CHANNEL_DATATYPE_PREFIX_MAP) {
844         if (prefix == dataType.second) {
845             return dataType.first;
846         }
847     }
848     return std::nullopt;
849 }
850 
GetChannelId(const std::string & channelName,const ChannelDataType dataType)851 int32_t ChannelManager::GetChannelId(const std::string& channelName, const ChannelDataType dataType)
852 {
853     HILOGI("channelName: %{public}s, dataType: %{public}d", channelName.c_str(),
854         static_cast<int32_t>(dataType));
855     std::shared_lock<std::shared_mutex> readLock(channelMutex_);
856     auto it = channelIdMap_.find(channelName);
857     if (it == channelIdMap_.end()) {
858         HILOGE("no valid channel exist");
859         return INVALID_CHANNEL_NAME;
860     }
861     for (const auto channelId : it->second) {
862         auto infoIt = channelInfoMap_.find(channelId);
863         if (infoIt == channelInfoMap_.end()) {
864             HILOGE("no valid channel exist");
865             return INVALID_CHANNEL_ID;
866         }
867         // find matching dataType
868         if (infoIt->second.dataType == dataType) {
869             return infoIt->second.channelId;
870         }
871     }
872     HILOGE("no matching channel");
873     return NO_SUCH_CHANNEL;
874 }
875 
RegisterSocket(const int32_t socketId,const int32_t channelId)876 int32_t ChannelManager::RegisterSocket(const int32_t socketId, const int32_t channelId)
877 {
878     // update channelInfo
879     HILOGI("register socket with channel, channelId=%{public}d, socketId=%{public}d", channelId, socketId);
880     ChannelDataType dataType = ChannelDataType::BYTES;
881     {
882         std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
883         auto infoIt = channelInfoMap_.find(channelId);
884         if (infoIt == channelInfoMap_.end()) {
885             HILOGE("no valid channel");
886             return INVALID_CHANNEL_ID;
887         }
888         dataType = infoIt->second.dataType;
889         infoIt->second.clientSockets.push_back(socketId);
890         infoIt->second.dataSenderReceivers[socketId] = std::make_unique<DataSenderReceiver>(socketId);
891     }
892     // update socket
893     {
894         std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
895         socketChannelMap_[socketId] = channelId;
896         socketStatusMap_[socketId] = ChannelStatus::CONNECTED;
897     }
898     if (dataType == ChannelDataType::FILE) {
899         HILOGI("file socket, regist softbus file schema");
900         int32_t ret = SoftbusFileAdpater::GetInstance().SetFileSchema(socketId);
901         if (ret != ERR_OK) {
902             HILOGE("register %{public}d file schema failed", socketId);
903             return REGISTER_FILE_SCHEMA_FAILED;
904         }
905     }
906     return ERR_OK;
907 }
908 
909 template <typename Func, typename... Args>
NotifyListeners(const int32_t channelId,Func listenerFunc,const AppExecFwk::EventQueue::Priority priority,Args &&...args)910 void ChannelManager::NotifyListeners(const int32_t channelId, Func listenerFunc,
911     const AppExecFwk::EventQueue::Priority priority, Args&&... args)
912 {
913     std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
914     auto it = listenersMap_.find(channelId);
915     if (it == listenersMap_.end() || it->second.empty()) {
916         HILOGE("no matching listener to %{public}d", channelId);
917         return;
918     }
919     auto& listeners = it->second;
920     for (const auto& listener : listeners) {
921         if (auto ptr = listener.lock()) {
922             auto func = [ptr, listenerFunc, channelId, args...]() {
923                 (ptr.get()->*listenerFunc)(channelId, std::forward<Args>(args)...);
924             };
925             PostCallbackTask(func, priority);
926         }
927     }
928 }
929 
OnSocketError(int32_t socketId,const int32_t errorCode)930 void ChannelManager::OnSocketError(int32_t socketId, const int32_t errorCode)
931 {
932     int32_t channelId = 0;
933     CHECK_SOCKET_ID(socketId);
934     CHECK_CHANNEL_ID(socketId, channelId);
935     DoErrorCallback(channelId, errorCode);
936 }
937 
DoErrorCallback(const int32_t channelId,const int32_t errorCode)938 void ChannelManager::DoErrorCallback(const int32_t channelId, const int32_t errorCode)
939 {
940     NotifyListeners(channelId, &IChannelListener::OnError,
941         AppExecFwk::EventQueue::Priority::IMMEDIATE, errorCode);
942 }
943 
DoConnectCallback(const int32_t channelId)944 void ChannelManager::DoConnectCallback(const int32_t channelId)
945 {
946     NotifyListeners(channelId, &IChannelListener::OnConnect,
947         AppExecFwk::EventQueue::Priority::IMMEDIATE);
948 }
949 
OnSocketClosed(int32_t socketId,const ShutdownReason & reason)950 void ChannelManager::OnSocketClosed(int32_t socketId, const ShutdownReason& reason)
951 {
952     int32_t channelId = 0;
953     CHECK_SOCKET_ID(socketId);
954     CHECK_CHANNEL_ID(socketId, channelId);
955     HILOGI("socket %{public}d closed, reason:%{public}d", socketId, reason);
956     int32_t ret = SetSocketStatus(socketId, ChannelStatus::UNCONNECTED);
957     if (ret != ERR_OK) {
958         HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
959         DoErrorCallback(channelId, ret);
960         return;
961     }
962     // delete channel when all socket shutdown
963     if (GetChannelStatus(channelId) == ChannelStatus::UNCONNECTED) {
964         DoDisConnectCallback(channelId, reason);
965         DeleteChannel(channelId);
966     }
967 }
968 
GetChannelId(const int32_t socketId)969 int32_t ChannelManager::GetChannelId(const int32_t socketId)
970 {
971     std::shared_lock<std::shared_mutex> readLock(socketMutex_);
972     auto it = socketChannelMap_.find(socketId);
973     if (it == socketChannelMap_.end()) {
974         HILOGE("no proper channelId to %{public}d", socketId);
975         return INVALID_SOCKET_ID;
976     }
977     return it->second;
978 }
979 
DoDisConnectCallback(const int32_t channelId,const ShutdownReason & reason)980 void ChannelManager::DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason)
981 {
982     NotifyListeners(channelId, &IChannelListener::OnDisConnect,
983         AppExecFwk::EventQueue::Priority::IMMEDIATE, reason);
984 }
985 
GetChannelStatus(const int32_t channelId)986 ChannelStatus ChannelManager::GetChannelStatus(const int32_t channelId)
987 {
988     std::shared_lock<std::shared_mutex> readLock(channelMutex_);
989     auto it = channelInfoMap_.find(channelId);
990     if (it != channelInfoMap_.end()) {
991         return it->second.status;
992     }
993     return ChannelStatus::UNCONNECTED;
994 }
995 
996 template <typename Func, typename... Args>
DoSendData(const int32_t channelId,Func doSendFunc,Args &&...args)997 int32_t ChannelManager::DoSendData(const int32_t channelId, Func doSendFunc, Args&&... args)
998 {
999     HILOGD("start to send data");
1000     int32_t socketId = GetValidSocket(channelId);
1001     if (socketId <= 0) {
1002         HILOGE("no avaliable sockets, %{public}d", channelId);
1003         return NO_CONNECTED_SOCKET_ID;
1004     }
1005     int32_t ret = ERR_OK;
1006     {
1007         std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1008         auto infoIt = channelInfoMap_.find(channelId);
1009         if (infoIt == channelInfoMap_.end()) {
1010             HILOGE("no valid channel info exist");
1011             DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1012             return INVALID_CHANNEL_ID;
1013         }
1014         auto socketIt = infoIt->second.dataSenderReceivers.find(socketId);
1015         if (socketIt == infoIt->second.dataSenderReceivers.end()) {
1016             HILOGE("no valid socket");
1017             DoErrorCallback(channelId, INVALID_SOCKET_ID);
1018             return INVALID_SOCKET_ID;
1019         }
1020         auto& senderReceiver = *(socketIt->second);
1021         ret = (senderReceiver.*doSendFunc)(std::forward<Args>(args)...);
1022     }
1023     if (ret != ERR_OK) {
1024         HILOGE("failed send data, %{public}d", ret);
1025         DoErrorCallback(channelId, ret);
1026         return ret;
1027     }
1028     return ERR_OK;
1029 }
1030 
SendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1031 int32_t ChannelManager::SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data)
1032 {
1033     if (!isValidChannelId(channelId) || data == nullptr) {
1034         HILOGE("invalid channel id. %{public}d", channelId);
1035         return INVALID_CHANNEL_ID;
1036     }
1037     HILOGI("start to send bytes, %{public}u", static_cast<uint32_t>(data->Size()));
1038     auto func = [channelId, data, this]() {
1039         DoSendBytes(channelId, data);
1040     };
1041     int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1042         std::to_string(channelId));
1043     if (ret != ERR_OK) {
1044         HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1045         return ret;
1046     }
1047     HILOGI("send bytes task added to handler");
1048     return ERR_OK;
1049 }
1050 
DoSendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1051 inline int32_t ChannelManager::DoSendBytes(const int32_t channelId,
1052     const std::shared_ptr<AVTransDataBuffer>& data)
1053 {
1054     HILOGD("start to send bytes");
1055     return DoSendData(channelId, &DataSenderReceiver::SendBytesData, data);
1056 }
1057 
GetValidSocket(const int32_t channelId)1058 int32_t ChannelManager::GetValidSocket(const int32_t channelId)
1059 {
1060     std::vector<int32_t> socketIds;
1061     {
1062         std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1063         auto infoIt = channelInfoMap_.find(channelId);
1064         if (infoIt == channelInfoMap_.end() || infoIt->second.status == ChannelStatus::UNCONNECTED) {
1065             HILOGE("invalid channelId, %{public}d", channelId);
1066             return -1;
1067         }
1068         socketIds.insert(socketIds.begin(),
1069             infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
1070     }
1071 
1072     for (const auto socketId : socketIds) {
1073         if (GetSocketStatus(socketId) == ChannelStatus::CONNECTED) {
1074             return socketId;
1075         }
1076     }
1077     return -1;
1078 }
1079 
SendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1080 int32_t ChannelManager::SendStream(const int32_t channelId,
1081     const std::shared_ptr<AVTransStreamData>& data)
1082 {
1083     if (!isValidChannelId(channelId) || data == nullptr) {
1084         HILOGE("invalid channel id");
1085         DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1086         return INVALID_CHANNEL_ID;
1087     }
1088     HILOGD("start to send stream");
1089     auto func = [=]() {
1090         DoSendStream(channelId, data);
1091     };
1092     int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1093         std::to_string(channelId));
1094     if (ret != ERR_OK) {
1095         HILOGE("failed to add send stream task, ret=%{public}d", ret);
1096         return POST_TASK_FAILED;
1097     }
1098     HILOGD("send stream task added to handler");
1099     return ERR_OK;
1100 }
1101 
DoSendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1102 int32_t ChannelManager::DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1103 {
1104     HILOGD("start to send stream");
1105     return DoSendData(channelId, &DataSenderReceiver::SendStreamData, data);
1106 }
1107 
SendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1108 int32_t ChannelManager::SendMessage(const int32_t channelId,
1109     const std::shared_ptr<AVTransDataBuffer>& data)
1110 {
1111     if (!isValidChannelId(channelId) || data == nullptr) {
1112         HILOGE("invalid channel id. %{public}d", channelId);
1113         return INVALID_CHANNEL_ID;
1114     }
1115     HILOGD("start to send message, %{public}u", static_cast<uint32_t>(data->Size()));
1116     auto func = [channelId, data, this]() {
1117         DoSendMessage(channelId, data);
1118     };
1119     int32_t ret = PostMsgTask(func, AppExecFwk::EventQueue::Priority::HIGH);
1120     if (ret != ERR_OK) {
1121         HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1122         return ret;
1123     }
1124     HILOGD("send message task added to handler");
1125     return ERR_OK;
1126 }
1127 
DoSendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1128 int32_t ChannelManager::DoSendMessage(const int32_t channelId,
1129     const std::shared_ptr<AVTransDataBuffer>& data)
1130 {
1131     HILOGI("start to send message");
1132     return DoSendData(channelId, &DataSenderReceiver::SendMessageData, data);
1133 }
1134 
SendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1135 int32_t ChannelManager::SendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1136     const std::vector<std::string>& dFiles)
1137 {
1138     HILOGI("start to send files, %{public}d", channelId);
1139     if (!isValidChannelId(channelId) || sFiles.empty() || dFiles.empty()) {
1140         HILOGE("invalid channel id. %{public}d or empty sfiles", channelId);
1141         return INVALID_PARAMETERS_ERR;
1142     }
1143     if (sFiles.size() != dFiles.size() || sFiles.size() > MAX_FILE_COUNT) {
1144         HILOGE("src size:%{public}d, dst size:%{public}d illegal",
1145             static_cast<int32_t>(sFiles.size()),
1146             static_cast<int32_t>(dFiles.size()));
1147     }
1148     int32_t ret = ERR_OK;
1149     auto func = [channelId, sFiles, dFiles, this]() {
1150         DoSendFile(channelId, sFiles, dFiles);
1151     };
1152     ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW);
1153     if (ret != ERR_OK) {
1154         HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1155         return ret;
1156     }
1157     HILOGI("send files task added to handler");
1158     return ERR_OK;
1159 }
1160 
DoSendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1161 int32_t ChannelManager::DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1162     const std::vector<std::string>& dFiles)
1163 {
1164     HILOGI("start to do send files");
1165     return DoSendData(channelId, &DataSenderReceiver::SendFileData, sFiles, dFiles);
1166 }
1167 
OnBytesReceived(int32_t socketId,const void * data,const uint32_t dataLen)1168 void ChannelManager::OnBytesReceived(int32_t socketId,
1169     const void* data, const uint32_t dataLen)
1170 {
1171     int32_t channelId = 0;
1172     CHECK_SOCKET_ID(socketId);
1173     CHECK_CHANNEL_ID(socketId, channelId);
1174     CHECK_DATA_NULL(socketId, data, OnError);
1175     HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1176     std::shared_ptr<AVTransDataBuffer> packedData = ProcessRecvData(channelId, socketId, data, dataLen);
1177     if (!packedData) {
1178         return;
1179     }
1180     DoBytesReceiveCallback(channelId, packedData);
1181 }
1182 
ProcessRecvData(const int32_t channelId,const int32_t socketId,const void * data,const uint32_t dataLen)1183 std::shared_ptr<AVTransDataBuffer> ChannelManager::ProcessRecvData(const int32_t channelId,
1184     const int32_t socketId, const void* data, const uint32_t dataLen)
1185 {
1186     std::shared_lock<std::shared_mutex> readLock(channelMutex_);
1187     const uint8_t* header = static_cast<const uint8_t*>(data);
1188     auto infoIt = channelInfoMap_.find(channelId);
1189     if (infoIt == channelInfoMap_.end()) {
1190         DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1191         return nullptr;
1192     }
1193 
1194     int32_t ret = infoIt->second.dataSenderReceivers[socketId]->PackRecvPacketData(header, dataLen);
1195     if (ret != ERR_OK) {
1196         HILOGE("pack recv data failed");
1197         DoErrorCallback(channelId, ret);
1198         return nullptr;
1199     }
1200     return infoIt->second.dataSenderReceivers[socketId]->GetPacketedData();
1201 }
1202 
DoBytesReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1203 void ChannelManager::DoBytesReceiveCallback(const int32_t channelId,
1204     const std::shared_ptr<AVTransDataBuffer>& buffer)
1205 {
1206     NotifyListeners(channelId, &IChannelListener::OnBytes,
1207         AppExecFwk::EventQueue::Priority::LOW, buffer);
1208 }
1209 
OnMessageReceived(int32_t socketId,const void * data,const uint32_t dataLen)1210 void ChannelManager::OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen)
1211 {
1212     int32_t channelId = 0;
1213     CHECK_SOCKET_ID(socketId);
1214     CHECK_CHANNEL_ID(socketId, channelId);
1215     CHECK_DATA_NULL(socketId, data, OnError);
1216     HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1217     std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(dataLen);
1218     int32_t ret = memcpy_s(buffer->Data(),
1219         buffer->Size(), data, dataLen);
1220     if (ret != ERR_OK) {
1221         HILOGE("pack recv data failed");
1222         DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1223         return;
1224     }
1225     DoMessageReceiveCallback(channelId, buffer);
1226 }
1227 
DoMessageReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1228 void ChannelManager::DoMessageReceiveCallback(const int32_t channelId,
1229     const std::shared_ptr<AVTransDataBuffer>& buffer)
1230 {
1231     NotifyListeners(channelId, &IChannelListener::OnMessage,
1232         AppExecFwk::EventQueue::Priority::HIGH, buffer);
1233 }
1234 
OnStreamReceived(int32_t socketId,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)1235 void ChannelManager::OnStreamReceived(int32_t socketId, const StreamData* data,
1236     const StreamData* ext, const StreamFrameInfo* param)
1237 {
1238     int32_t channelId = 0;
1239     CHECK_SOCKET_ID(socketId);
1240     CHECK_CHANNEL_ID(socketId, channelId);
1241     CHECK_DATA_NULL(socketId, data, OnError);
1242     CHECK_DATA_NULL(socketId, ext, OnError);
1243     std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(data->bufLen);
1244     int32_t ret = memcpy_s(buffer->Data(), buffer->Size(), data->buf, data->bufLen);
1245     if (ret != ERR_OK) {
1246         HILOGE("copy stream data failed, %{public}d", socketId);
1247         DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1248         return;
1249     }
1250     AVTransStreamDataExt streamDataExt;
1251     std::shared_ptr<AVTransStreamData> streamData = std::make_shared<AVTransStreamData>(buffer, streamDataExt);
1252     ret = streamData->DeserializeStreamDataExt(ext->buf);
1253     if (ret != ERR_OK) {
1254         HILOGE("deserialize stream ext failed, %{public}d", socketId);
1255         DoErrorCallback(channelId, PARSE_AV_TRANS_STREAM_EXT_FAILED);
1256         return;
1257     }
1258     DoStreamReceiveCallback(channelId, streamData);
1259 }
1260 
DoStreamReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1261 void ChannelManager::DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1262 {
1263     NotifyListeners(channelId, &IChannelListener::OnStream,
1264         AppExecFwk::EventQueue::Priority::LOW, data);
1265 }
1266 
OnFileEventReceived(int32_t socketId,FileEvent * event)1267 void ChannelManager::OnFileEventReceived(int32_t socketId, FileEvent *event)
1268 {
1269     int32_t channelId = 0;
1270     CHECK_SOCKET_ID(socketId);
1271     // update recv path before onbind
1272     if (event == nullptr) {
1273         HILOGE("socket %{public}d event empty", socketId);
1274         return;
1275     }
1276     if (event->type == FileEventType::FILE_EVENT_RECV_UPDATE_PATH) {
1277         HILOGI("start to set update path func, %{public}d", socketId);
1278         return DispatchProcessFileEvent(fileChannelId_.load(), event);
1279     }
1280     CHECK_CHANNEL_ID(socketId, channelId);
1281     CHECK_DATA_NULL(socketId, event, OnError);
1282     HILOGI("start to dispatch file event, %{public}d", channelId);
1283     DispatchProcessFileEvent(channelId, event);
1284 }
1285 
DispatchProcessFileEvent(int32_t channelId,FileEvent * event)1286 void ChannelManager::DispatchProcessFileEvent(int32_t channelId, FileEvent *event)
1287 {
1288     HILOGI("start to dispatch file event");
1289     switch (event->type) {
1290         case FileEventType::FILE_EVENT_SEND_PROCESS:
1291         case FileEventType::FILE_EVENT_SEND_FINISH: {
1292             DealFileSendEvent(channelId, event);
1293             break;
1294         }
1295         case FileEventType::FILE_EVENT_RECV_START:
1296         case FileEventType::FILE_EVENT_RECV_PROCESS:
1297         case FileEventType::FILE_EVENT_RECV_FINISH: {
1298             DealFileRecvEvent(channelId, event);
1299             break;
1300         }
1301         case FileEventType::FILE_EVENT_BUTT:
1302         case FileEventType::FILE_EVENT_SEND_ERROR:
1303         case FileEventType::FILE_EVENT_RECV_ERROR: {
1304             DealFileErrorEvent(channelId, event);
1305             break;
1306         }
1307         case FileEventType::FILE_EVENT_RECV_UPDATE_PATH: {
1308             DealFileUpdatePathEvent(channelId, event);
1309             break;
1310         }
1311         default:
1312             break;
1313     }
1314 }
1315 
DealFileSendEvent(int32_t channelId,FileEvent * event)1316 void ChannelManager::DealFileSendEvent(int32_t channelId, FileEvent *event)
1317 {
1318     HILOGI("start to deal file send event, %{public}d", channelId);
1319     FileInfo info;
1320     if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1321         info.commonInfo.eventType = ChannelFileEvent::SEND_PROCESS;
1322     } else {
1323         info.commonInfo.eventType = ChannelFileEvent::SEND_FINISH;
1324     }
1325     info.commonInfo.fileCnt = event->fileCnt;
1326     for (uint32_t i = 0; i < event->fileCnt; ++i) {
1327         info.commonInfo.fileList.push_back(std::string(event->files[i]));
1328     }
1329 
1330     FileSendInfo sendInfo;
1331     sendInfo.bytesProcessed = event->bytesProcessed;
1332     sendInfo.bytesTotal = event->bytesTotal;
1333     if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1334         sendInfo.rate = event->rate;
1335     }
1336     info.sendInfo = sendInfo;
1337     DoFileSendCallback(channelId, info);
1338     HILOGI("end");
1339 }
1340 
DealFileRecvEvent(int32_t channelId,FileEvent * event)1341 void ChannelManager::DealFileRecvEvent(int32_t channelId, FileEvent *event)
1342 {
1343     HILOGI("start to deal file recv event, %{public}d", channelId);
1344     FileInfo info;
1345     if (event->type == FileEventType::FILE_EVENT_RECV_START) {
1346         info.commonInfo.eventType = ChannelFileEvent::RECV_START;
1347     } else if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1348         info.commonInfo.eventType = ChannelFileEvent::RECV_PROCESS;
1349     } else {
1350         info.commonInfo.eventType = ChannelFileEvent::RECV_FINISH;
1351     }
1352     info.commonInfo.fileCnt = event->fileCnt;
1353     for (uint32_t i = 0; i < event->fileCnt; ++i) {
1354         info.commonInfo.fileList.push_back(std::string(event->files[i]));
1355     }
1356 
1357     FileRecvInfo recvInfo;
1358     recvInfo.bytesProcessed = event->bytesProcessed;
1359     recvInfo.bytesTotal = event->bytesTotal;
1360     if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1361         recvInfo.rate = event->rate;
1362     }
1363     info.recvInfo = recvInfo;
1364     DoFileRecvCallback(channelId, info);
1365     HILOGI("end");
1366 }
1367 
DealFileErrorEvent(int32_t channelId,FileEvent * event)1368 void ChannelManager::DealFileErrorEvent(int32_t channelId, FileEvent *event)
1369 {
1370     HILOGI("start to deal file error event, %{public}d", channelId);
1371     FileInfo info;
1372     if (event->type == FileEventType::FILE_EVENT_SEND_ERROR) {
1373         info.commonInfo.eventType = ChannelFileEvent::SEND_ERROR;
1374     } else {
1375         info.commonInfo.eventType = ChannelFileEvent::RECV_ERROR;
1376     }
1377     info.commonInfo.fileCnt = event->fileCnt;
1378     for (uint32_t i = 0; i < event->fileCnt; ++i) {
1379         info.commonInfo.fileList.push_back(std::string(event->files[i]));
1380     }
1381 
1382     FileErrorInfo errorInfo;
1383     errorInfo.errorCode = event->errorCode;
1384     info.errorInfo = errorInfo;
1385     if (info.commonInfo.eventType == ChannelFileEvent::RECV_ERROR) {
1386         DoFileRecvCallback(channelId, info);
1387     } else {
1388         DoFileSendCallback(channelId, info);
1389     }
1390     HILOGI("end");
1391 }
1392 
DealFileUpdatePathEvent(int32_t channelId,FileEvent * event)1393 void ChannelManager::DealFileUpdatePathEvent(int32_t channelId, FileEvent *event)
1394 {
1395     HILOGI("start to deal file update path event, %{public}d", channelId);
1396     event->UpdateRecvPath = GetRecvPath;
1397     HILOGI("end");
1398 }
1399 
GetRecvPathFromUser()1400 const char* ChannelManager::GetRecvPathFromUser()
1401 {
1402     HILOGI("get recv path from user");
1403     int32_t channelId = static_cast<int32_t>(fileChannelId_.load());
1404     std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1405     auto it = listenersMap_.find(channelId);
1406     if (it == listenersMap_.end() || it->second.empty()) {
1407         HILOGE("no matching listener to %{public}d", channelId);
1408         return nullptr;
1409     }
1410     auto& listeners = it->second;
1411     for (const auto& listener : listeners) {
1412         if (auto ptr = listener.lock()) {
1413             return ptr->GetRecvPath(channelId);
1414         }
1415     }
1416     return nullptr;
1417 }
1418 
DoFileRecvCallback(const int32_t channelId,const FileInfo & info)1419 void ChannelManager::DoFileRecvCallback(const int32_t channelId, const FileInfo& info)
1420 {
1421     NotifyListeners(channelId, &IChannelListener::OnRecvFile,
1422         AppExecFwk::EventQueue::Priority::HIGH, info);
1423 }
1424 
DoFileSendCallback(const int32_t channelId,const FileInfo & info)1425 void ChannelManager::DoFileSendCallback(const int32_t channelId, const FileInfo& info)
1426 {
1427     NotifyListeners(channelId, &IChannelListener::OnSendFile,
1428         AppExecFwk::EventQueue::Priority::HIGH, info);
1429 }
1430 }
1431 }
1432