1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "channel_manager.h"
17 #include "dtbcollabmgr_log.h"
18 #include "softbus_file_adapter.h"
19 #include <algorithm>
20 #include <chrono>
21 #include <future>
22 #include <sys/prctl.h>
23 #include "securec.h"
24 #include <unordered_set>
25
26 namespace OHOS {
27 namespace DistributedCollab {
28 IMPLEMENT_SINGLE_INSTANCE(ChannelManager);
29 namespace {
30 static const std::string TAG = "DSchedCollabChannelManager";
31
32 enum class QosSpeedType {
33 HIGH,
34 LOW
35 };
36
37 static std::map<ChannelDataType, TransDataType> CHANNEL_SOFTBUS_DATATYPE_MAP = {
38 { ChannelDataType::MESSAGE, DATA_TYPE_MESSAGE },
39 { ChannelDataType::BYTES, DATA_TYPE_BYTES },
40 { ChannelDataType::VIDEO_STREAM, DATA_TYPE_VIDEO_STREAM },
41 { ChannelDataType::FILE, DATA_TYPE_FILE }
42 };
43
44 static std::map<ChannelDataType, QosSpeedType> CHANNEL_DATATYPE_SPEED_MAP = {
45 { ChannelDataType::MESSAGE, QosSpeedType::LOW },
46 { ChannelDataType::BYTES, QosSpeedType::LOW },
47 { ChannelDataType::VIDEO_STREAM, QosSpeedType::HIGH },
48 { ChannelDataType::FILE, QosSpeedType::HIGH }
49 };
50
51 static const std::string SPLIT_FLAG = "_";
52 static const std::string COLLAB_PGK_NAME = "dms";
53 static const std::string SESSION_NAME_PREFIX = "ohos.dtbcollab.dms";
54
55 static std::map<ChannelDataType, std::string> CHANNEL_DATATYPE_PREFIX_MAP = {
56 { ChannelDataType::MESSAGE, "M" },
57 { ChannelDataType::BYTES, "B" },
58 { ChannelDataType::VIDEO_STREAM, "V" },
59 { ChannelDataType::FILE, "F" }
60 };
61
62 static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW = 4 * 1024 * 1024;
63 static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY = 10000;
64 static constexpr int32_t DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY = 2000;
65
66 static QosTV g_low_qosInfo[] = {
67 { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_BW },
68 { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MAX_LATENCY },
69 { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_LOW_QOS_TYPE_MIN_LATENCY },
70 { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
71 };
72
73 static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW = 4 * 1024 * 1024;
74 static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY = 10000;
75 static constexpr int32_t DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY = 2000;
76
77 static QosTV g_high_qosInfo[] = {
78 { .qos = QOS_TYPE_MIN_BW, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_BW },
79 { .qos = QOS_TYPE_MAX_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MAX_LATENCY },
80 { .qos = QOS_TYPE_MIN_LATENCY, .value = DSCHED_COLLAB_HIGH_QOS_TYPE_MIN_LATENCY },
81 { .qos = QOS_TYPE_MAX_IDLE_TIMEOUT, .value = 60 * 60 * 1000 }
82 };
83
84 static std::map<QosSpeedType, QosTV*> qos_config = {
85 { QosSpeedType::HIGH, g_high_qosInfo },
86 { QosSpeedType::LOW, g_low_qosInfo }
87 };
88
89 static uint32_t g_lowQosTvParamIndex = static_cast<uint32_t>(sizeof(g_low_qosInfo) / sizeof(QosTV));
90 static uint32_t g_highQosTvParamIndex = static_cast<uint32_t>(sizeof(g_high_qosInfo) / sizeof(QosTV));
91 static std::map<QosSpeedType, uint32_t> qos_speed_config = {
92 { QosSpeedType::HIGH, g_highQosTvParamIndex },
93 { QosSpeedType::LOW, g_lowQosTvParamIndex }
94 };
95
96 #define CHECK_SOCKET_ID(socketId) \
97 do { \
98 if ((socketId) <= 0) { \
99 HILOGE("invalid socket id, %{public}d", socketId); \
100 return; \
101 } \
102 } while (0)
103
104 #define CHECK_CHANNEL_ID(socketId, channelId) \
105 do { \
106 (channelId) = GetChannelId(socketId); \
107 if (!isValidChannelId(channelId)) { \
108 HILOGE("invalid socket id %{public}d, can't find channel", socketId); \
109 return; \
110 } \
111 } while (0)
112
113 #define CHECK_DATA_NULL(socketId, data, errorHandler) \
114 do { \
115 if ((data) == nullptr) { \
116 HILOGE("receive empty bytes data, socketId=%{public}d", socketId); \
117 (errorHandler)(socketId, RECV_DATA_EMPTY); \
118 return; \
119 } \
120 } while (0)
121 }
122
OnSocketConnected(int32_t socket,PeerSocketInfo info)123 static void OnSocketConnected(int32_t socket, PeerSocketInfo info)
124 {
125 ChannelManager::GetInstance().OnSocketConnected(socket, info);
126 }
127
OnSocketClosed(int32_t socket,ShutdownReason reason)128 static void OnSocketClosed(int32_t socket, ShutdownReason reason)
129 {
130 ChannelManager::GetInstance().OnSocketClosed(socket, reason);
131 }
132
OnBytesRecv(int32_t socket,const void * data,uint32_t dataLen)133 static void OnBytesRecv(int32_t socket, const void* data, uint32_t dataLen)
134 {
135 ChannelManager::GetInstance().OnBytesReceived(socket, data, dataLen);
136 }
137
OnMessageRecv(int32_t socket,const void * data,uint32_t dataLen)138 static void OnMessageRecv(int32_t socket, const void* data, uint32_t dataLen)
139 {
140 ChannelManager::GetInstance().OnMessageReceived(socket, data, dataLen);
141 }
142
OnStreamRecv(int32_t socket,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)143 static void OnStreamRecv(int32_t socket, const StreamData* data, const StreamData* ext,
144 const StreamFrameInfo* param)
145 {
146 ChannelManager::GetInstance().OnStreamReceived(socket, data, ext, param);
147 }
148
OnError(int32_t socket,int32_t errCode)149 static void OnError(int32_t socket, int32_t errCode)
150 {
151 ChannelManager::GetInstance().OnSocketError(socket, errCode);
152 }
153
OnFileEvent(int32_t socket,FileEvent * event)154 static void OnFileEvent(int32_t socket, FileEvent *event)
155 {
156 ChannelManager::GetInstance().OnFileEventReceived(socket, event);
157 }
158
GetRecvPath()159 static const char* GetRecvPath()
160 {
161 return ChannelManager::GetInstance().GetRecvPathFromUser();
162 }
163
164 ISocketListener channelManagerListener = {
165 .OnBind = OnSocketConnected,
166 .OnShutdown = OnSocketClosed,
167 .OnBytes = OnBytesRecv,
168 .OnMessage = OnMessageRecv,
169 .OnStream = OnStreamRecv,
170 .OnFile = OnFileEvent,
171 .OnError = OnError,
172 };
173
~ChannelManager()174 ChannelManager::~ChannelManager()
175 {
176 DeInit();
177 };
178
Init(const std::string & ownerName)179 int32_t ChannelManager::Init(const std::string& ownerName)
180 {
181 HILOGI("start init channel manager");
182 if (eventHandler_ != nullptr && callbackEventHandler_ != nullptr && msgEventHandler_ != nullptr) {
183 HILOGW("server channel already init");
184 return ERR_OK;
185 }
186 if (serverSocketId_ > 0) {
187 HILOGW("server socket already init");
188 return ERR_OK;
189 }
190 ownerName_ = ownerName;
191
192 eventThread_ = std::thread(&ChannelManager::StartEvent, this);
193 std::unique_lock<std::mutex> lock(eventMutex_);
194 eventCon_.wait(lock, [this] {
195 return eventHandler_ != nullptr;
196 });
197
198 callbackEventThread_ = std::thread(&ChannelManager::StartCallbackEvent, this);
199 std::unique_lock<std::mutex> callbackLock(callbackEventMutex_);
200 callbackEventCon_.wait(callbackLock, [this] {
201 return callbackEventHandler_ != nullptr;
202 });
203
204 msgEventThread_ = std::thread(&ChannelManager::StartMsgEvent, this);
205 std::unique_lock<std::mutex> msgLock(msgEventMutex_);
206 msgEventCon_.wait(msgLock, [this] {
207 return msgEventHandler_ != nullptr;
208 });
209
210 int32_t socketServerId = CreateServerSocket();
211 if (socketServerId <= 0) {
212 HILOGE("create socket failed, ret: %{public}d", socketServerId);
213 return CREATE_SOCKET_FAILED;
214 }
215 int32_t ret = Listen(socketServerId, g_low_qosInfo,
216 g_lowQosTvParamIndex, &channelManagerListener);
217 if (ret != ERR_OK) {
218 HILOGE("service listen failed, ret: %{public}d", ret);
219 return LISTEN_SOCKET_FAILED;
220 }
221 serverSocketId_ = socketServerId;
222 HILOGI("end");
223 return ERR_OK;
224 }
225
StartEvent()226 void ChannelManager::StartEvent()
227 {
228 HILOGI("StartEvent start");
229 prctl(PR_SET_NAME, ownerName_.c_str());
230 auto runner = AppExecFwk::EventRunner::Create(false);
231 {
232 std::lock_guard<std::mutex> lock(eventMutex_);
233 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
234 }
235 eventCon_.notify_one();
236 runner->Run();
237 HILOGI("StartEvent end");
238 }
239
StartCallbackEvent()240 void ChannelManager::StartCallbackEvent()
241 {
242 HILOGI("Start callback event start");
243 std::string callbackName = ownerName_ + "callback";
244 prctl(PR_SET_NAME, callbackName.c_str());
245 auto runner = AppExecFwk::EventRunner::Create(false);
246 {
247 std::lock_guard<std::mutex> lock(callbackEventMutex_);
248 callbackEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
249 }
250 callbackEventCon_.notify_one();
251 runner->Run();
252 HILOGI("callback event end");
253 }
254
StartMsgEvent()255 void ChannelManager::StartMsgEvent()
256 {
257 HILOGI("Start msg event start");
258 std::string msgName = ownerName_ + "msg";
259 prctl(PR_SET_NAME, msgName.c_str());
260 auto runner = AppExecFwk::EventRunner::Create(false);
261 {
262 std::lock_guard<std::mutex> lock(msgEventMutex_);
263 msgEventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
264 }
265 msgEventCon_.notify_one();
266 runner->Run();
267 HILOGI("msg event end");
268 }
269
PostTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority,const std::string & name)270 int32_t ChannelManager::PostTask(const AppExecFwk::InnerEvent::Callback& callback,
271 const AppExecFwk::EventQueue::Priority priority, const std::string& name)
272 {
273 if (eventHandler_ == nullptr) {
274 HILOGE("event handler empty");
275 return NULL_EVENT_HANDLER;
276 }
277 if (eventHandler_->PostTask(callback, name, 0, priority)) {
278 return ERR_OK;
279 }
280 HILOGE("add task failed");
281 return POST_TASK_FAILED;
282 }
283
PostCallbackTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)284 int32_t ChannelManager::PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback,
285 const AppExecFwk::EventQueue::Priority priority)
286 {
287 if (callbackEventHandler_ == nullptr) {
288 HILOGE("callback event handler empty");
289 return NULL_EVENT_HANDLER;
290 }
291 if (callbackEventHandler_->PostTask(callback, priority)) {
292 return ERR_OK;
293 }
294 HILOGE("add callback task failed");
295 return POST_TASK_FAILED;
296 }
297
PostMsgTask(const AppExecFwk::InnerEvent::Callback & callback,const AppExecFwk::EventQueue::Priority priority)298 int32_t ChannelManager::PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback,
299 const AppExecFwk::EventQueue::Priority priority)
300 {
301 if (msgEventHandler_ == nullptr) {
302 HILOGE("msg event handler empty");
303 return NULL_EVENT_HANDLER;
304 }
305 if (msgEventHandler_->PostTask(callback, priority)) {
306 return ERR_OK;
307 }
308 HILOGE("add msg task failed");
309 return POST_TASK_FAILED;
310 }
311
CreateServerSocket()312 int32_t ChannelManager::CreateServerSocket()
313 {
314 HILOGI("start create server socket");
315 std::string sessionName = SESSION_NAME_PREFIX + ownerName_;
316 HILOGI("sessionName: %{public}s, size: %{public}zu", sessionName.c_str(),
317 sessionName.length());
318 SocketInfo info = {
319 .name = const_cast<char*>(sessionName.c_str()),
320 .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
321 };
322 int32_t socket = Socket(info);
323 HILOGI("finish, socket id: %{public}d", socket);
324 return socket;
325 }
326
DeInit()327 void ChannelManager::DeInit()
328 {
329 HILOGI("start deinit channel manager");
330 // stop all task
331 if (eventHandler_ != nullptr) {
332 eventHandler_->GetEventRunner()->Stop();
333 if (eventThread_.joinable()) {
334 eventThread_.join();
335 }
336 eventHandler_ = nullptr;
337 } else {
338 HILOGE("eventHandler_ is nullptr");
339 }
340
341 // stop callback task
342 if (callbackEventHandler_ != nullptr) {
343 callbackEventHandler_->GetEventRunner()->Stop();
344 if (callbackEventThread_.joinable()) {
345 callbackEventThread_.join();
346 }
347 callbackEventHandler_ = nullptr;
348 } else {
349 HILOGE("callbackEventHandler_ is nullptr");
350 }
351
352 // stop msg task
353 if (msgEventHandler_ != nullptr) {
354 msgEventHandler_->GetEventRunner()->Stop();
355 if (msgEventThread_.joinable()) {
356 msgEventThread_.join();
357 }
358 msgEventHandler_ = nullptr;
359 } else {
360 HILOGE("msgEventHandler_ is nullptr");
361 }
362
363 // release channels
364 std::unordered_set<int32_t> channelIds;
365 for (const auto& entry : channelIdMap_) {
366 for (int32_t id : entry.second) {
367 channelIds.insert(id);
368 }
369 }
370 for (const int32_t id : channelIds) {
371 DeleteChannel(id);
372 }
373 Shutdown(serverSocketId_);
374 Reset();
375 HILOGI("end");
376 }
377
Reset()378 void ChannelManager::Reset()
379 {
380 HILOGI("reset channel manager");
381 serverSocketId_ = -1;
382 ownerName_ = "";
383 nextIds_ = {
384 { ChannelDataType::MESSAGE, MESSAGE_START_ID },
385 { ChannelDataType::BYTES, BYTES_START_ID },
386 { ChannelDataType::VIDEO_STREAM, STREAM_START_ID },
387 { ChannelDataType::FILE, FILE_START_ID }
388 };
389 }
390
GetVersion()391 int32_t ChannelManager::GetVersion()
392 {
393 return VERSION_;
394 }
395
CreateServerChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)396 int32_t ChannelManager::CreateServerChannel(const std::string& channelName,
397 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
398 {
399 HILOGI("start to creat server channel waiting for connect");
400 std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
401 if (!info) {
402 HILOGE("Create server channel failed");
403 return CREATE_SERVER_CHANNEL_FAILED;
404 }
405 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
406 channelIdMap_[channelName].push_back(info->channelId);
407 channelInfoMap_.emplace(info->channelId, std::move(*info));
408 // save file channel
409 fileChannelId_.store(info->channelId);
410 HILOGI("end");
411 return info->channelId;
412 }
413
CreateClientChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)414 int32_t ChannelManager::CreateClientChannel(const std::string& channelName,
415 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
416 {
417 HILOGI("start to creat client channel to connect other");
418 std::optional<ChannelInfo> info = CreateBaseChannel(channelName, dataType, peerInfo);
419 if (!info) {
420 HILOGE("Create client channel failed");
421 return CREATE_CLIENT_CHANNEL_FAILED;
422 }
423 int32_t ret = RegisterSocket(*info, dataType);
424 HILOGI("end");
425 return ret == ERR_OK ? info->channelId : ret;
426 };
427
CreateBaseChannel(const std::string & channelName,const ChannelDataType dataType,const ChannelPeerInfo & peerInfo)428 std::optional<ChannelInfo> ChannelManager::CreateBaseChannel(const std::string& channelName,
429 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo)
430 {
431 HILOGI("start create base channel, dataType=%{public}d, name=%{public}s",
432 static_cast<int32_t>(dataType), channelName.c_str());
433 int32_t channelId = GenerateNextId(dataType);
434 if (!isValidChannelId(channelId)) {
435 HILOGE("Get channel id failed, id=%{public}d", channelId);
436 return std::nullopt;
437 }
438 ChannelInfo info;
439 info.channelId = channelId;
440 info.channelName = channelName;
441 info.status = ChannelStatus::UNCONNECTED;
442 info.dataType = dataType;
443 info.peerInfo = peerInfo;
444 return info;
445 }
446
GenerateNextId(const ChannelDataType dataType)447 int32_t ChannelManager::GenerateNextId(const ChannelDataType dataType)
448 {
449 int32_t channelId = 0;
450 // lock for each type
451 std::lock_guard<std::mutex> typeLock(typeMutex_[dataType]);
452 HILOGI("create socket for %{public}d", static_cast<int32_t>(dataType));
453 channelId = nextIds_[dataType];
454 if (channelId - CHANNEL_ID_GAP * (static_cast<int32_t>(dataType) + 1)
455 >= CHANNEL_ID_GAP) {
456 HILOGE("type %{public}d exceed max channel",
457 static_cast<int32_t>(dataType));
458 return CHANNEL_NUM_EXCEED_LIMIT;
459 }
460 nextIds_[dataType]++;
461 return channelId;
462 }
463
RegisterSocket(ChannelInfo & info,const ChannelDataType dataType)464 int32_t ChannelManager::RegisterSocket(ChannelInfo& info, const ChannelDataType dataType)
465 {
466 int32_t clientSocketId = CreateClientSocket(info.channelName,
467 info.peerInfo.peerName, info.peerInfo.networkId, dataType);
468 if (clientSocketId <= 0) {
469 HILOGE("create socket failed, ret: %{public}d", clientSocketId);
470 return CREATE_SOCKET_FAILED;
471 }
472 // save info to each map
473 {
474 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
475 socketChannelMap_.emplace(clientSocketId, info.channelId);
476 socketStatusMap_.emplace(clientSocketId, ChannelStatus::UNCONNECTED);
477 }
478 HILOGI("register channel name: %{public}s", info.channelName.c_str());
479 {
480 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
481 info.clientSockets.push_back(clientSocketId);
482 info.dataSenderReceivers[clientSocketId] = std::make_unique<DataSenderReceiver>(clientSocketId);
483 channelIdMap_[info.channelName].push_back(info.channelId);
484 channelInfoMap_.emplace(info.channelId, std::move(info));
485 }
486 return ERR_OK;
487 }
488
CreateClientSocket(const std::string & channelName,const std::string & peerName,const std::string & peerNetworkId,const ChannelDataType dataType)489 int32_t ChannelManager::CreateClientSocket(const std::string& channelName,
490 const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType)
491 {
492 HILOGI("start");
493 if (channelName.length() > MAX_CHANNEL_NAME_LENGTH) {
494 HILOGE("channel name too long, %{public}s", channelName.c_str());
495 return -INVALID_CHANNEL_NAME;
496 }
497 // ohos.dtbcollab.dms64_F_64
498 std::string name = SESSION_NAME_PREFIX + ownerName_ +
499 SPLIT_FLAG + CHANNEL_DATATYPE_PREFIX_MAP[dataType] + SPLIT_FLAG + channelName;
500 std::string peerSocketName = SESSION_NAME_PREFIX + peerName;
501 HILOGI("self-name: %{public}s, peerName: %{public}s", name.c_str(), peerSocketName.c_str());
502 SocketInfo socketInfo = {
503 .name = const_cast<char*>(name.c_str()),
504 .peerName = const_cast<char*>(peerSocketName.c_str()),
505 .peerNetworkId = const_cast<char*>(peerNetworkId.c_str()),
506 .pkgName = const_cast<char*>(COLLAB_PGK_NAME.c_str()),
507 .dataType = CHANNEL_SOFTBUS_DATATYPE_MAP[dataType]
508 };
509 int32_t sessionId = Socket(socketInfo);
510 HILOGI("finish, socket session id: %{public}d", sessionId);
511 return sessionId;
512 }
513
isValidChannelId(const int32_t channelId)514 bool ChannelManager::isValidChannelId(const int32_t channelId)
515 {
516 return channelId > CHANNEL_ID_GAP && channelId <= (FILE_START_ID + CHANNEL_ID_GAP);
517 }
518
DeleteChannel(const int32_t channelId)519 int32_t ChannelManager::DeleteChannel(const int32_t channelId)
520 {
521 HILOGI("start delete channel");
522 if (!isValidChannelId(channelId)) {
523 HILOGE("invalid channel id");
524 return INVALID_CHANNEL_ID;
525 }
526 ClearRegisterListener(channelId);
527 ClearSendTask(channelId);
528 ClearRegisterChannel(channelId);
529 ClearRegisterSocket(channelId);
530 HILOGI("end delete channel");
531 return channelId;
532 }
533
ClearRegisterChannel(const int32_t channelId)534 void ChannelManager::ClearRegisterChannel(const int32_t channelId)
535 {
536 HILOGI("start clear channel info, channelId=%{public}d", channelId);
537 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
538 std::string channelName;
539 auto infoIt = channelInfoMap_.find(channelId);
540 if (infoIt != channelInfoMap_.end()) {
541 channelName = infoIt->second.channelName;
542 }
543 channelInfoMap_.erase(channelId);
544
545 auto idIt = channelIdMap_.find(channelName);
546 if (idIt != channelIdMap_.end()) {
547 idIt->second.erase(std::remove(idIt->second.begin(), idIt->second.end(), channelId), idIt->second.end());
548 }
549 }
550
ClearRegisterListener(const int32_t channelId)551 void ChannelManager::ClearRegisterListener(const int32_t channelId)
552 {
553 HILOGI("start release listener, channelId=%{public}d", channelId);
554 std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
555 listenersMap_.erase(channelId);
556 }
557
ClearRegisterSocket(const int32_t channelId)558 void ChannelManager::ClearRegisterSocket(const int32_t channelId)
559 {
560 HILOGI("start release socket, channelId=%{public}d", channelId);
561 std::vector<int32_t> socketIds;
562 {
563 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
564 if (!socketChannelMap_.empty()) {
565 for (auto&& socket : socketChannelMap_) {
566 if (socket.second == channelId) {
567 socketIds.push_back(socket.first);
568 }
569 }
570 }
571 for (const auto socketId : socketIds) {
572 HILOGI("start release socket, %{public}d", socketId);
573 socketChannelMap_.erase(socketId);
574 socketStatusMap_.erase(socketId);
575 }
576 }
577 HILOGI("start to shutdown socket");
578 for (const auto socketId : socketIds) {
579 Shutdown(socketId);
580 }
581 }
582
ClearSendTask(int32_t channelId)583 void ChannelManager::ClearSendTask(int32_t channelId)
584 {
585 HILOGI("clear send task for=%{public}d", channelId);
586 if (eventHandler_ != nullptr) {
587 eventHandler_->RemoveTask(std::to_string(channelId));
588 }
589 }
590
RegisterChannelListener(const int32_t channelId,const std::shared_ptr<IChannelListener> listener)591 int32_t ChannelManager::RegisterChannelListener(const int32_t channelId,
592 const std::shared_ptr<IChannelListener> listener)
593 {
594 HILOGI("start register listener, channelId=%{public}d", channelId);
595 if (listener == nullptr) {
596 HILOGE("listener empty");
597 return INVALID_LISTENER;
598 }
599 std::unique_lock<std::shared_mutex> writeLock(listenerMutex_);
600 auto listenIt = listenersMap_.find(channelId);
601 if (listenIt == listenersMap_.end()) {
602 listenersMap_[channelId].emplace_back(listener);
603 return ERR_OK;
604 }
605 CleanInvalidListener(listenIt->second);
606 auto it = std::find_if(listenIt->second.begin(), listenIt->second.end(),
607 [&listener](const std::weak_ptr<IChannelListener> weakListener) {
608 if (auto ptr = weakListener.lock()) {
609 return listener == ptr;
610 }
611 return false;
612 });
613 if (it != listenIt->second.end()) {
614 HILOGI("already exist listener");
615 return ERR_OK;
616 }
617 listenIt->second.emplace_back(listener);
618 return ERR_OK;
619 }
620
CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>> & listeners)621 inline void ChannelManager::CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners)
622 {
623 listeners.erase(std::remove_if(listeners.begin(), listeners.end(),
624 [](const std::weak_ptr<IChannelListener> listener) {
625 return listener.expired();
626 }),
627 listeners.end());
628 }
629
ConnectChannel(const int32_t channelId)630 int32_t ChannelManager::ConnectChannel(const int32_t channelId)
631 {
632 HILOGI("start to connect channel %{public}d, only allow client", channelId);
633 std::vector<int32_t> socketIds;
634 ChannelDataType dataType;
635 {
636 std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
637 auto infoIt = channelInfoMap_.find(channelId);
638 if (infoIt == channelInfoMap_.end() || infoIt->second.clientSockets.empty()) {
639 HILOGE("invalid channel id");
640 return INVALID_CHANNEL_ID;
641 }
642 dataType = infoIt->second.dataType;
643 socketIds.insert(socketIds.begin(), infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
644 }
645 HILOGI("end");
646 return DoBindSockets(socketIds, dataType);
647 }
648
DoBindSockets(const std::vector<int32_t> & socketIds,const ChannelDataType dataType)649 int32_t ChannelManager::DoBindSockets(const std::vector<int32_t>& socketIds,
650 const ChannelDataType dataType)
651 {
652 HILOGI("start to connect sockets");
653 std::vector<std::future<int32_t>> bindTasks;
654 for (const auto& socketId : socketIds) {
655 if (GetSocketStatus(socketId) == ChannelStatus::UNCONNECTED) {
656 bindTasks.emplace_back(std::async(std::launch::async, [this, socketId, dataType]() {
657 return BindSocket(socketId, dataType);
658 }));
659 }
660 }
661 if (bindTasks.empty()) {
662 return ERR_OK;
663 }
664 for (auto&& task : bindTasks) {
665 int32_t ret = task.get();
666 HILOGI("bind task ret=%{public}d", ret);
667 if (ret == ERR_OK) {
668 return ERR_OK;
669 }
670 }
671
672 return CONNECT_CHANNEL_FAILED;
673 }
674
GetSocketStatus(const int32_t socketId)675 ChannelStatus ChannelManager::GetSocketStatus(const int32_t socketId)
676 {
677 std::shared_lock<std::shared_mutex> readLock(socketMutex_);
678 auto it = socketStatusMap_.find(socketId);
679 if (it != socketStatusMap_.end()) {
680 return it->second;
681 }
682 return ChannelStatus::CONNECTED;
683 }
684
BindSocket(const int32_t socketId,const ChannelDataType dataType)685 int32_t ChannelManager::BindSocket(const int32_t socketId, const ChannelDataType dataType)
686 {
687 QosSpeedType speedType = CHANNEL_DATATYPE_SPEED_MAP[dataType];
688 const QosTV* qos = qos_config[speedType];
689 const uint32_t qosCount = qos_speed_config[speedType];
690 HILOGI("start to bind socket, id:%{public}d, speed:%{public}d", socketId, speedType);
691 int32_t ret = Bind(socketId, qos, qosCount, &channelManagerListener);
692 HILOGI("bind end");
693 if (ret != ERR_OK) {
694 HILOGE("client bind failed, ret: %{public}d", ret);
695 return BIND_SOCKET_FAILED;
696 }
697 if (dataType == ChannelDataType::FILE) {
698 ret = SoftbusFileAdpater::GetInstance().SetFileSchema(socketId);
699 }
700 if (ret != ERR_OK) {
701 HILOGE("register %{public}d file schema failed", socketId);
702 return REGISTER_FILE_SCHEMA_FAILED;
703 }
704 SetSocketStatus(socketId, ChannelStatus::CONNECTED);
705 return ret;
706 }
707
SetSocketStatus(const int32_t socketId,const ChannelStatus status)708 int32_t ChannelManager::SetSocketStatus(const int32_t socketId, const ChannelStatus status)
709 {
710 HILOGI("start set socket id:%{public}d status %{public}d", socketId, static_cast<int32_t>(status));
711 int32_t channelId = 0;
712 {
713 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
714 auto it = socketStatusMap_.find(socketId);
715 if (it == socketStatusMap_.end()) {
716 HILOGE("no valid socket in socketStatusMap");
717 return INVALID_SOCKET_ID;
718 }
719 it->second = status;
720 auto channelIt = socketChannelMap_.find(socketId);
721 if (channelIt == socketChannelMap_.end()) {
722 HILOGE("no valid socket in socketChannelMap");
723 return INVALID_SOCKET_ID;
724 }
725 channelId = channelIt->second;
726 }
727 auto func = [channelId, this]() {
728 UpdateChannelStatus(channelId);
729 };
730 return PostCallbackTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
731 }
732
UpdateChannelStatus(const int32_t channelId)733 int32_t ChannelManager::UpdateChannelStatus(const int32_t channelId)
734 {
735 HILOGI("update channel id=%{public}d", channelId);
736 std::vector<int32_t> socketIds;
737 ChannelStatus curStatus = ChannelStatus::UNCONNECTED;
738 {
739 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
740 auto infoIt = channelInfoMap_.find(channelId);
741 if (infoIt == channelInfoMap_.end()) {
742 HILOGE("no valid channelInfo for %{public}d", channelId);
743 return INVALID_CHANNEL_ID;
744 }
745 for (const auto socket : infoIt->second.clientSockets) {
746 socketIds.push_back(socket);
747 }
748 curStatus = infoIt->second.status;
749 }
750
751 ChannelStatus newStatus = ChannelStatus::UNCONNECTED;
752 for (const auto id : socketIds) {
753 if (GetSocketStatus(id) == ChannelStatus::CONNECTED) {
754 newStatus = ChannelStatus::CONNECTED;
755 break;
756 }
757 }
758 HILOGI("curStatus:%{public}d, newStatus:%{public}d",
759 static_cast<int32_t>(curStatus), static_cast<int32_t>(newStatus));
760 if (newStatus != curStatus) {
761 return SetChannelStatus(channelId, newStatus);
762 }
763 return ERR_OK;
764 }
765
SetChannelStatus(const int32_t channelId,const ChannelStatus status)766 int32_t ChannelManager::SetChannelStatus(const int32_t channelId, const ChannelStatus status)
767 {
768 HILOGI("set channel:%{public}d, status:%{public}d", channelId, static_cast<int32_t>(status));
769 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
770 auto infoIt = channelInfoMap_.find(channelId);
771 if (infoIt == channelInfoMap_.end()) {
772 HILOGE("no valid channelInfo for %{public}d", channelId);
773 return INVALID_CHANNEL_ID;
774 }
775 infoIt->second.status = status;
776 return ERR_OK;
777 }
778
OnSocketConnected(int32_t socketId,const PeerSocketInfo & info)779 void ChannelManager::OnSocketConnected(int32_t socketId, const PeerSocketInfo& info)
780 {
781 if (socketId <= 0) {
782 HILOGE("invalid socketId: %{public}d", socketId);
783 return;
784 }
785 HILOGI("socket %{public}d binded", socketId);
786 std::optional<std::string> channelNameOpt = GetChannelNameFromSocket(info.name);
787 if (!channelNameOpt) {
788 HILOGE("error socket name, %{public}s", info.name);
789 return;
790 }
791 auto& channelName = *channelNameOpt;
792 std::optional<ChannelDataType> channelType = GetChannelDataTypeFromName(channelName);
793 if (!channelType) {
794 HILOGE("error channel name, %{public}s", channelName.c_str());
795 return;
796 }
797 // remove datatype flag
798 constexpr int32_t namePrefix = 2;
799 channelName = channelName.substr(namePrefix);
800 int32_t channelId = GetChannelId(channelName, *channelType);
801 if (!isValidChannelId(channelId)) {
802 HILOGE("invalid channelid=%{public}d with channelName %{public}s", channelId, channelName.c_str());
803 return;
804 }
805 auto func = [socketId, channelId, this]() {
806 UpdateChannel(socketId, channelId);
807 };
808 PostTask(func, AppExecFwk::EventQueue::Priority::IMMEDIATE);
809 HILOGI("add update channel task into handler");
810 }
811
UpdateChannel(const int32_t socketId,const int32_t channelId)812 int32_t ChannelManager::UpdateChannel(const int32_t socketId, const int32_t channelId)
813 {
814 int32_t ret = RegisterSocket(socketId, channelId);
815 if (ret != ERR_OK) {
816 HILOGE("failed to save binded socket to matching channel");
817 DoErrorCallback(channelId, ret);
818 return ret;
819 }
820 ret = SetSocketStatus(socketId, ChannelStatus::CONNECTED);
821 if (ret != ERR_OK) {
822 HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
823 DoErrorCallback(channelId, ret);
824 return ret;
825 }
826 DoConnectCallback(channelId);
827 return ret;
828 }
829
GetChannelNameFromSocket(const std::string & socketName)830 std::optional<std::string> ChannelManager::GetChannelNameFromSocket(const std::string& socketName)
831 {
832 size_t splitPos = socketName.find(SPLIT_FLAG, CHANNEL_NAME_PREFIX_LENGTH);
833 if (splitPos == std::string::npos) {
834 HILOGE("peer socket name invalid");
835 return std::nullopt;
836 }
837 return socketName.substr(splitPos + SPLIT_FLAG.length());
838 }
839
GetChannelDataTypeFromName(const std::string & channelName)840 std::optional<ChannelDataType> ChannelManager::GetChannelDataTypeFromName(const std::string& channelName)
841 {
842 std::string prefix = channelName.substr(0, 1);
843 for (auto&& dataType : CHANNEL_DATATYPE_PREFIX_MAP) {
844 if (prefix == dataType.second) {
845 return dataType.first;
846 }
847 }
848 return std::nullopt;
849 }
850
GetChannelId(const std::string & channelName,const ChannelDataType dataType)851 int32_t ChannelManager::GetChannelId(const std::string& channelName, const ChannelDataType dataType)
852 {
853 HILOGI("channelName: %{public}s, dataType: %{public}d", channelName.c_str(),
854 static_cast<int32_t>(dataType));
855 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
856 auto it = channelIdMap_.find(channelName);
857 if (it == channelIdMap_.end()) {
858 HILOGE("no valid channel exist");
859 return INVALID_CHANNEL_NAME;
860 }
861 for (const auto channelId : it->second) {
862 auto infoIt = channelInfoMap_.find(channelId);
863 if (infoIt == channelInfoMap_.end()) {
864 HILOGE("no valid channel exist");
865 return INVALID_CHANNEL_ID;
866 }
867 // find matching dataType
868 if (infoIt->second.dataType == dataType) {
869 return infoIt->second.channelId;
870 }
871 }
872 HILOGE("no matching channel");
873 return NO_SUCH_CHANNEL;
874 }
875
RegisterSocket(const int32_t socketId,const int32_t channelId)876 int32_t ChannelManager::RegisterSocket(const int32_t socketId, const int32_t channelId)
877 {
878 // update channelInfo
879 HILOGI("register socket with channel, channelId=%{public}d, socketId=%{public}d", channelId, socketId);
880 ChannelDataType dataType = ChannelDataType::BYTES;
881 {
882 std::unique_lock<std::shared_mutex> writeLock(channelMutex_);
883 auto infoIt = channelInfoMap_.find(channelId);
884 if (infoIt == channelInfoMap_.end()) {
885 HILOGE("no valid channel");
886 return INVALID_CHANNEL_ID;
887 }
888 dataType = infoIt->second.dataType;
889 infoIt->second.clientSockets.push_back(socketId);
890 infoIt->second.dataSenderReceivers[socketId] = std::make_unique<DataSenderReceiver>(socketId);
891 }
892 // update socket
893 {
894 std::unique_lock<std::shared_mutex> writeLock(socketMutex_);
895 socketChannelMap_[socketId] = channelId;
896 socketStatusMap_[socketId] = ChannelStatus::CONNECTED;
897 }
898 if (dataType == ChannelDataType::FILE) {
899 HILOGI("file socket, regist softbus file schema");
900 int32_t ret = SoftbusFileAdpater::GetInstance().SetFileSchema(socketId);
901 if (ret != ERR_OK) {
902 HILOGE("register %{public}d file schema failed", socketId);
903 return REGISTER_FILE_SCHEMA_FAILED;
904 }
905 }
906 return ERR_OK;
907 }
908
909 template <typename Func, typename... Args>
NotifyListeners(const int32_t channelId,Func listenerFunc,const AppExecFwk::EventQueue::Priority priority,Args &&...args)910 void ChannelManager::NotifyListeners(const int32_t channelId, Func listenerFunc,
911 const AppExecFwk::EventQueue::Priority priority, Args&&... args)
912 {
913 std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
914 auto it = listenersMap_.find(channelId);
915 if (it == listenersMap_.end() || it->second.empty()) {
916 HILOGE("no matching listener to %{public}d", channelId);
917 return;
918 }
919 auto& listeners = it->second;
920 for (const auto& listener : listeners) {
921 if (auto ptr = listener.lock()) {
922 auto func = [ptr, listenerFunc, channelId, args...]() {
923 (ptr.get()->*listenerFunc)(channelId, std::forward<Args>(args)...);
924 };
925 PostCallbackTask(func, priority);
926 }
927 }
928 }
929
OnSocketError(int32_t socketId,const int32_t errorCode)930 void ChannelManager::OnSocketError(int32_t socketId, const int32_t errorCode)
931 {
932 int32_t channelId = 0;
933 CHECK_SOCKET_ID(socketId);
934 CHECK_CHANNEL_ID(socketId, channelId);
935 DoErrorCallback(channelId, errorCode);
936 }
937
DoErrorCallback(const int32_t channelId,const int32_t errorCode)938 void ChannelManager::DoErrorCallback(const int32_t channelId, const int32_t errorCode)
939 {
940 NotifyListeners(channelId, &IChannelListener::OnError,
941 AppExecFwk::EventQueue::Priority::IMMEDIATE, errorCode);
942 }
943
DoConnectCallback(const int32_t channelId)944 void ChannelManager::DoConnectCallback(const int32_t channelId)
945 {
946 NotifyListeners(channelId, &IChannelListener::OnConnect,
947 AppExecFwk::EventQueue::Priority::IMMEDIATE);
948 }
949
OnSocketClosed(int32_t socketId,const ShutdownReason & reason)950 void ChannelManager::OnSocketClosed(int32_t socketId, const ShutdownReason& reason)
951 {
952 int32_t channelId = 0;
953 CHECK_SOCKET_ID(socketId);
954 CHECK_CHANNEL_ID(socketId, channelId);
955 HILOGI("socket %{public}d closed, reason:%{public}d", socketId, reason);
956 int32_t ret = SetSocketStatus(socketId, ChannelStatus::UNCONNECTED);
957 if (ret != ERR_OK) {
958 HILOGE("failed to set socket status, %{public}d->%{public}d", channelId, ret);
959 DoErrorCallback(channelId, ret);
960 return;
961 }
962 // delete channel when all socket shutdown
963 if (GetChannelStatus(channelId) == ChannelStatus::UNCONNECTED) {
964 DoDisConnectCallback(channelId, reason);
965 DeleteChannel(channelId);
966 }
967 }
968
GetChannelId(const int32_t socketId)969 int32_t ChannelManager::GetChannelId(const int32_t socketId)
970 {
971 std::shared_lock<std::shared_mutex> readLock(socketMutex_);
972 auto it = socketChannelMap_.find(socketId);
973 if (it == socketChannelMap_.end()) {
974 HILOGE("no proper channelId to %{public}d", socketId);
975 return INVALID_SOCKET_ID;
976 }
977 return it->second;
978 }
979
DoDisConnectCallback(const int32_t channelId,const ShutdownReason & reason)980 void ChannelManager::DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason)
981 {
982 NotifyListeners(channelId, &IChannelListener::OnDisConnect,
983 AppExecFwk::EventQueue::Priority::IMMEDIATE, reason);
984 }
985
GetChannelStatus(const int32_t channelId)986 ChannelStatus ChannelManager::GetChannelStatus(const int32_t channelId)
987 {
988 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
989 auto it = channelInfoMap_.find(channelId);
990 if (it != channelInfoMap_.end()) {
991 return it->second.status;
992 }
993 return ChannelStatus::UNCONNECTED;
994 }
995
996 template <typename Func, typename... Args>
DoSendData(const int32_t channelId,Func doSendFunc,Args &&...args)997 int32_t ChannelManager::DoSendData(const int32_t channelId, Func doSendFunc, Args&&... args)
998 {
999 HILOGD("start to send data");
1000 int32_t socketId = GetValidSocket(channelId);
1001 if (socketId <= 0) {
1002 HILOGE("no avaliable sockets, %{public}d", channelId);
1003 return NO_CONNECTED_SOCKET_ID;
1004 }
1005 int32_t ret = ERR_OK;
1006 {
1007 std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1008 auto infoIt = channelInfoMap_.find(channelId);
1009 if (infoIt == channelInfoMap_.end()) {
1010 HILOGE("no valid channel info exist");
1011 DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1012 return INVALID_CHANNEL_ID;
1013 }
1014 auto socketIt = infoIt->second.dataSenderReceivers.find(socketId);
1015 if (socketIt == infoIt->second.dataSenderReceivers.end()) {
1016 HILOGE("no valid socket");
1017 DoErrorCallback(channelId, INVALID_SOCKET_ID);
1018 return INVALID_SOCKET_ID;
1019 }
1020 auto& senderReceiver = *(socketIt->second);
1021 ret = (senderReceiver.*doSendFunc)(std::forward<Args>(args)...);
1022 }
1023 if (ret != ERR_OK) {
1024 HILOGE("failed send data, %{public}d", ret);
1025 DoErrorCallback(channelId, ret);
1026 return ret;
1027 }
1028 return ERR_OK;
1029 }
1030
SendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1031 int32_t ChannelManager::SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data)
1032 {
1033 if (!isValidChannelId(channelId) || data == nullptr) {
1034 HILOGE("invalid channel id. %{public}d", channelId);
1035 return INVALID_CHANNEL_ID;
1036 }
1037 HILOGI("start to send bytes, %{public}u", static_cast<uint32_t>(data->Size()));
1038 auto func = [channelId, data, this]() {
1039 DoSendBytes(channelId, data);
1040 };
1041 int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1042 std::to_string(channelId));
1043 if (ret != ERR_OK) {
1044 HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1045 return ret;
1046 }
1047 HILOGI("send bytes task added to handler");
1048 return ERR_OK;
1049 }
1050
DoSendBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1051 inline int32_t ChannelManager::DoSendBytes(const int32_t channelId,
1052 const std::shared_ptr<AVTransDataBuffer>& data)
1053 {
1054 HILOGD("start to send bytes");
1055 return DoSendData(channelId, &DataSenderReceiver::SendBytesData, data);
1056 }
1057
GetValidSocket(const int32_t channelId)1058 int32_t ChannelManager::GetValidSocket(const int32_t channelId)
1059 {
1060 std::vector<int32_t> socketIds;
1061 {
1062 std::shared_lock<std::shared_mutex> channelReadLock(channelMutex_);
1063 auto infoIt = channelInfoMap_.find(channelId);
1064 if (infoIt == channelInfoMap_.end() || infoIt->second.status == ChannelStatus::UNCONNECTED) {
1065 HILOGE("invalid channelId, %{public}d", channelId);
1066 return -1;
1067 }
1068 socketIds.insert(socketIds.begin(),
1069 infoIt->second.clientSockets.begin(), infoIt->second.clientSockets.end());
1070 }
1071
1072 for (const auto socketId : socketIds) {
1073 if (GetSocketStatus(socketId) == ChannelStatus::CONNECTED) {
1074 return socketId;
1075 }
1076 }
1077 return -1;
1078 }
1079
SendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1080 int32_t ChannelManager::SendStream(const int32_t channelId,
1081 const std::shared_ptr<AVTransStreamData>& data)
1082 {
1083 if (!isValidChannelId(channelId) || data == nullptr) {
1084 HILOGE("invalid channel id");
1085 DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1086 return INVALID_CHANNEL_ID;
1087 }
1088 HILOGD("start to send stream");
1089 auto func = [=]() {
1090 DoSendStream(channelId, data);
1091 };
1092 int32_t ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW,
1093 std::to_string(channelId));
1094 if (ret != ERR_OK) {
1095 HILOGE("failed to add send stream task, ret=%{public}d", ret);
1096 return POST_TASK_FAILED;
1097 }
1098 HILOGD("send stream task added to handler");
1099 return ERR_OK;
1100 }
1101
DoSendStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1102 int32_t ChannelManager::DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1103 {
1104 HILOGD("start to send stream");
1105 return DoSendData(channelId, &DataSenderReceiver::SendStreamData, data);
1106 }
1107
SendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1108 int32_t ChannelManager::SendMessage(const int32_t channelId,
1109 const std::shared_ptr<AVTransDataBuffer>& data)
1110 {
1111 if (!isValidChannelId(channelId) || data == nullptr) {
1112 HILOGE("invalid channel id. %{public}d", channelId);
1113 return INVALID_CHANNEL_ID;
1114 }
1115 HILOGD("start to send message, %{public}u", static_cast<uint32_t>(data->Size()));
1116 auto func = [channelId, data, this]() {
1117 DoSendMessage(channelId, data);
1118 };
1119 int32_t ret = PostMsgTask(func, AppExecFwk::EventQueue::Priority::HIGH);
1120 if (ret != ERR_OK) {
1121 HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1122 return ret;
1123 }
1124 HILOGD("send message task added to handler");
1125 return ERR_OK;
1126 }
1127
DoSendMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & data)1128 int32_t ChannelManager::DoSendMessage(const int32_t channelId,
1129 const std::shared_ptr<AVTransDataBuffer>& data)
1130 {
1131 HILOGI("start to send message");
1132 return DoSendData(channelId, &DataSenderReceiver::SendMessageData, data);
1133 }
1134
SendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1135 int32_t ChannelManager::SendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1136 const std::vector<std::string>& dFiles)
1137 {
1138 HILOGI("start to send files, %{public}d", channelId);
1139 if (!isValidChannelId(channelId) || sFiles.empty() || dFiles.empty()) {
1140 HILOGE("invalid channel id. %{public}d or empty sfiles", channelId);
1141 return INVALID_PARAMETERS_ERR;
1142 }
1143 if (sFiles.size() != dFiles.size() || sFiles.size() > MAX_FILE_COUNT) {
1144 HILOGE("src size:%{public}d, dst size:%{public}d illegal",
1145 static_cast<int32_t>(sFiles.size()),
1146 static_cast<int32_t>(dFiles.size()));
1147 }
1148 int32_t ret = ERR_OK;
1149 auto func = [channelId, sFiles, dFiles, this]() {
1150 DoSendFile(channelId, sFiles, dFiles);
1151 };
1152 ret = PostTask(func, AppExecFwk::EventQueue::Priority::LOW);
1153 if (ret != ERR_OK) {
1154 HILOGE("failed to add send bytes task, ret=%{public}d", ret);
1155 return ret;
1156 }
1157 HILOGI("send files task added to handler");
1158 return ERR_OK;
1159 }
1160
DoSendFile(const int32_t channelId,const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)1161 int32_t ChannelManager::DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
1162 const std::vector<std::string>& dFiles)
1163 {
1164 HILOGI("start to do send files");
1165 return DoSendData(channelId, &DataSenderReceiver::SendFileData, sFiles, dFiles);
1166 }
1167
OnBytesReceived(int32_t socketId,const void * data,const uint32_t dataLen)1168 void ChannelManager::OnBytesReceived(int32_t socketId,
1169 const void* data, const uint32_t dataLen)
1170 {
1171 int32_t channelId = 0;
1172 CHECK_SOCKET_ID(socketId);
1173 CHECK_CHANNEL_ID(socketId, channelId);
1174 CHECK_DATA_NULL(socketId, data, OnError);
1175 HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1176 std::shared_ptr<AVTransDataBuffer> packedData = ProcessRecvData(channelId, socketId, data, dataLen);
1177 if (!packedData) {
1178 return;
1179 }
1180 DoBytesReceiveCallback(channelId, packedData);
1181 }
1182
ProcessRecvData(const int32_t channelId,const int32_t socketId,const void * data,const uint32_t dataLen)1183 std::shared_ptr<AVTransDataBuffer> ChannelManager::ProcessRecvData(const int32_t channelId,
1184 const int32_t socketId, const void* data, const uint32_t dataLen)
1185 {
1186 std::shared_lock<std::shared_mutex> readLock(channelMutex_);
1187 const uint8_t* header = static_cast<const uint8_t*>(data);
1188 auto infoIt = channelInfoMap_.find(channelId);
1189 if (infoIt == channelInfoMap_.end()) {
1190 DoErrorCallback(channelId, INVALID_CHANNEL_ID);
1191 return nullptr;
1192 }
1193
1194 int32_t ret = infoIt->second.dataSenderReceivers[socketId]->PackRecvPacketData(header, dataLen);
1195 if (ret != ERR_OK) {
1196 HILOGE("pack recv data failed");
1197 DoErrorCallback(channelId, ret);
1198 return nullptr;
1199 }
1200 return infoIt->second.dataSenderReceivers[socketId]->GetPacketedData();
1201 }
1202
DoBytesReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1203 void ChannelManager::DoBytesReceiveCallback(const int32_t channelId,
1204 const std::shared_ptr<AVTransDataBuffer>& buffer)
1205 {
1206 NotifyListeners(channelId, &IChannelListener::OnBytes,
1207 AppExecFwk::EventQueue::Priority::LOW, buffer);
1208 }
1209
OnMessageReceived(int32_t socketId,const void * data,const uint32_t dataLen)1210 void ChannelManager::OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen)
1211 {
1212 int32_t channelId = 0;
1213 CHECK_SOCKET_ID(socketId);
1214 CHECK_CHANNEL_ID(socketId, channelId);
1215 CHECK_DATA_NULL(socketId, data, OnError);
1216 HILOGI("receive data: %{public}d, len=%{public}d", socketId, dataLen);
1217 std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(dataLen);
1218 int32_t ret = memcpy_s(buffer->Data(),
1219 buffer->Size(), data, dataLen);
1220 if (ret != ERR_OK) {
1221 HILOGE("pack recv data failed");
1222 DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1223 return;
1224 }
1225 DoMessageReceiveCallback(channelId, buffer);
1226 }
1227
DoMessageReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer)1228 void ChannelManager::DoMessageReceiveCallback(const int32_t channelId,
1229 const std::shared_ptr<AVTransDataBuffer>& buffer)
1230 {
1231 NotifyListeners(channelId, &IChannelListener::OnMessage,
1232 AppExecFwk::EventQueue::Priority::HIGH, buffer);
1233 }
1234
OnStreamReceived(int32_t socketId,const StreamData * data,const StreamData * ext,const StreamFrameInfo * param)1235 void ChannelManager::OnStreamReceived(int32_t socketId, const StreamData* data,
1236 const StreamData* ext, const StreamFrameInfo* param)
1237 {
1238 int32_t channelId = 0;
1239 CHECK_SOCKET_ID(socketId);
1240 CHECK_CHANNEL_ID(socketId, channelId);
1241 CHECK_DATA_NULL(socketId, data, OnError);
1242 CHECK_DATA_NULL(socketId, ext, OnError);
1243 std::shared_ptr<AVTransDataBuffer> buffer = std::make_shared<AVTransDataBuffer>(data->bufLen);
1244 int32_t ret = memcpy_s(buffer->Data(), buffer->Size(), data->buf, data->bufLen);
1245 if (ret != ERR_OK) {
1246 HILOGE("copy stream data failed, %{public}d", socketId);
1247 DoErrorCallback(channelId, COPY_DATA_TO_BUFFER_FAILED);
1248 return;
1249 }
1250 AVTransStreamDataExt streamDataExt;
1251 std::shared_ptr<AVTransStreamData> streamData = std::make_shared<AVTransStreamData>(buffer, streamDataExt);
1252 ret = streamData->DeserializeStreamDataExt(ext->buf);
1253 if (ret != ERR_OK) {
1254 HILOGE("deserialize stream ext failed, %{public}d", socketId);
1255 DoErrorCallback(channelId, PARSE_AV_TRANS_STREAM_EXT_FAILED);
1256 return;
1257 }
1258 DoStreamReceiveCallback(channelId, streamData);
1259 }
1260
DoStreamReceiveCallback(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & data)1261 void ChannelManager::DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data)
1262 {
1263 NotifyListeners(channelId, &IChannelListener::OnStream,
1264 AppExecFwk::EventQueue::Priority::LOW, data);
1265 }
1266
OnFileEventReceived(int32_t socketId,FileEvent * event)1267 void ChannelManager::OnFileEventReceived(int32_t socketId, FileEvent *event)
1268 {
1269 int32_t channelId = 0;
1270 CHECK_SOCKET_ID(socketId);
1271 // update recv path before onbind
1272 if (event == nullptr) {
1273 HILOGE("socket %{public}d event empty", socketId);
1274 return;
1275 }
1276 if (event->type == FileEventType::FILE_EVENT_RECV_UPDATE_PATH) {
1277 HILOGI("start to set update path func, %{public}d", socketId);
1278 return DispatchProcessFileEvent(fileChannelId_.load(), event);
1279 }
1280 CHECK_CHANNEL_ID(socketId, channelId);
1281 CHECK_DATA_NULL(socketId, event, OnError);
1282 HILOGI("start to dispatch file event, %{public}d", channelId);
1283 DispatchProcessFileEvent(channelId, event);
1284 }
1285
DispatchProcessFileEvent(int32_t channelId,FileEvent * event)1286 void ChannelManager::DispatchProcessFileEvent(int32_t channelId, FileEvent *event)
1287 {
1288 HILOGI("start to dispatch file event");
1289 switch (event->type) {
1290 case FileEventType::FILE_EVENT_SEND_PROCESS:
1291 case FileEventType::FILE_EVENT_SEND_FINISH: {
1292 DealFileSendEvent(channelId, event);
1293 break;
1294 }
1295 case FileEventType::FILE_EVENT_RECV_START:
1296 case FileEventType::FILE_EVENT_RECV_PROCESS:
1297 case FileEventType::FILE_EVENT_RECV_FINISH: {
1298 DealFileRecvEvent(channelId, event);
1299 break;
1300 }
1301 case FileEventType::FILE_EVENT_BUTT:
1302 case FileEventType::FILE_EVENT_SEND_ERROR:
1303 case FileEventType::FILE_EVENT_RECV_ERROR: {
1304 DealFileErrorEvent(channelId, event);
1305 break;
1306 }
1307 case FileEventType::FILE_EVENT_RECV_UPDATE_PATH: {
1308 DealFileUpdatePathEvent(channelId, event);
1309 break;
1310 }
1311 default:
1312 break;
1313 }
1314 }
1315
DealFileSendEvent(int32_t channelId,FileEvent * event)1316 void ChannelManager::DealFileSendEvent(int32_t channelId, FileEvent *event)
1317 {
1318 HILOGI("start to deal file send event, %{public}d", channelId);
1319 FileInfo info;
1320 if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1321 info.commonInfo.eventType = ChannelFileEvent::SEND_PROCESS;
1322 } else {
1323 info.commonInfo.eventType = ChannelFileEvent::SEND_FINISH;
1324 }
1325 info.commonInfo.fileCnt = event->fileCnt;
1326 for (uint32_t i = 0; i < event->fileCnt; ++i) {
1327 info.commonInfo.fileList.push_back(std::string(event->files[i]));
1328 }
1329
1330 FileSendInfo sendInfo;
1331 sendInfo.bytesProcessed = event->bytesProcessed;
1332 sendInfo.bytesTotal = event->bytesTotal;
1333 if (event->type == FileEventType::FILE_EVENT_SEND_PROCESS) {
1334 sendInfo.rate = event->rate;
1335 }
1336 info.sendInfo = sendInfo;
1337 DoFileSendCallback(channelId, info);
1338 HILOGI("end");
1339 }
1340
DealFileRecvEvent(int32_t channelId,FileEvent * event)1341 void ChannelManager::DealFileRecvEvent(int32_t channelId, FileEvent *event)
1342 {
1343 HILOGI("start to deal file recv event, %{public}d", channelId);
1344 FileInfo info;
1345 if (event->type == FileEventType::FILE_EVENT_RECV_START) {
1346 info.commonInfo.eventType = ChannelFileEvent::RECV_START;
1347 } else if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1348 info.commonInfo.eventType = ChannelFileEvent::RECV_PROCESS;
1349 } else {
1350 info.commonInfo.eventType = ChannelFileEvent::RECV_FINISH;
1351 }
1352 info.commonInfo.fileCnt = event->fileCnt;
1353 for (uint32_t i = 0; i < event->fileCnt; ++i) {
1354 info.commonInfo.fileList.push_back(std::string(event->files[i]));
1355 }
1356
1357 FileRecvInfo recvInfo;
1358 recvInfo.bytesProcessed = event->bytesProcessed;
1359 recvInfo.bytesTotal = event->bytesTotal;
1360 if (event->type == FileEventType::FILE_EVENT_RECV_PROCESS) {
1361 recvInfo.rate = event->rate;
1362 }
1363 info.recvInfo = recvInfo;
1364 DoFileRecvCallback(channelId, info);
1365 HILOGI("end");
1366 }
1367
DealFileErrorEvent(int32_t channelId,FileEvent * event)1368 void ChannelManager::DealFileErrorEvent(int32_t channelId, FileEvent *event)
1369 {
1370 HILOGI("start to deal file error event, %{public}d", channelId);
1371 FileInfo info;
1372 if (event->type == FileEventType::FILE_EVENT_SEND_ERROR) {
1373 info.commonInfo.eventType = ChannelFileEvent::SEND_ERROR;
1374 } else {
1375 info.commonInfo.eventType = ChannelFileEvent::RECV_ERROR;
1376 }
1377 info.commonInfo.fileCnt = event->fileCnt;
1378 for (uint32_t i = 0; i < event->fileCnt; ++i) {
1379 info.commonInfo.fileList.push_back(std::string(event->files[i]));
1380 }
1381
1382 FileErrorInfo errorInfo;
1383 errorInfo.errorCode = event->errorCode;
1384 info.errorInfo = errorInfo;
1385 if (info.commonInfo.eventType == ChannelFileEvent::RECV_ERROR) {
1386 DoFileRecvCallback(channelId, info);
1387 } else {
1388 DoFileSendCallback(channelId, info);
1389 }
1390 HILOGI("end");
1391 }
1392
DealFileUpdatePathEvent(int32_t channelId,FileEvent * event)1393 void ChannelManager::DealFileUpdatePathEvent(int32_t channelId, FileEvent *event)
1394 {
1395 HILOGI("start to deal file update path event, %{public}d", channelId);
1396 event->UpdateRecvPath = GetRecvPath;
1397 HILOGI("end");
1398 }
1399
GetRecvPathFromUser()1400 const char* ChannelManager::GetRecvPathFromUser()
1401 {
1402 HILOGI("get recv path from user");
1403 int32_t channelId = static_cast<int32_t>(fileChannelId_.load());
1404 std::shared_lock<std::shared_mutex> readLock(listenerMutex_);
1405 auto it = listenersMap_.find(channelId);
1406 if (it == listenersMap_.end() || it->second.empty()) {
1407 HILOGE("no matching listener to %{public}d", channelId);
1408 return nullptr;
1409 }
1410 auto& listeners = it->second;
1411 for (const auto& listener : listeners) {
1412 if (auto ptr = listener.lock()) {
1413 return ptr->GetRecvPath(channelId);
1414 }
1415 }
1416 return nullptr;
1417 }
1418
DoFileRecvCallback(const int32_t channelId,const FileInfo & info)1419 void ChannelManager::DoFileRecvCallback(const int32_t channelId, const FileInfo& info)
1420 {
1421 NotifyListeners(channelId, &IChannelListener::OnRecvFile,
1422 AppExecFwk::EventQueue::Priority::HIGH, info);
1423 }
1424
DoFileSendCallback(const int32_t channelId,const FileInfo & info)1425 void ChannelManager::DoFileSendCallback(const int32_t channelId, const FileInfo& info)
1426 {
1427 NotifyListeners(channelId, &IChannelListener::OnSendFile,
1428 AppExecFwk::EventQueue::Priority::HIGH, info);
1429 }
1430 }
1431 }
1432