1 /*
2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "ability_connection_session.h"
16
17 #include <charconv>
18 #include <chrono>
19 #include <map>
20 #include <unistd.h>
21 #include <sys/prctl.h>
22 #include <sstream>
23 #include <iomanip>
24 #include <future>
25 #include <vector>
26
27 #include "ability_connection_manager.h"
28 #include "dtbcollabmgr_log.h"
29 #include "dtbschedmgr_log.h"
30 #include "distributed_client.h"
31 #include "ipc_skeleton.h"
32 #include "message_data_header.h"
33 #include "openssl/sha.h"
34 #include "tokenid_kit.h"
35
36 namespace OHOS {
37 namespace DistributedCollab {
38 namespace {
39 const std::string TAG = "AbilityConnectionSession";
40 const std::string CONNECT_SESSION_TIMEOUT_END_TASK = "connect_session_timeout_end_task";
41 const std::u16string DMS_PROXY_INTERFACE_TOKEN = u"ohos.distributedschedule.accessToken";
42 const std::string EVENT_CONNECT = "connect";
43 const std::string EVENT_DISCONNECT = "disconnect";
44 const std::string EVENT_RECEIVE_MESSAGE = "receiveMessage";
45 const std::string EVENT_RECEIVE_DATA = "receiveData";
46 const std::string EVENT_RECEIVE_IMAGE = "receiveImage";
47 const std::string EVENT_COLLABORATE = "collaborateEvent";
48 constexpr int32_t DSCHED_COLLAB_PROTOCOL_VERSION = 1;
49 static constexpr uint16_t PROTOCOL_VERSION = 1;
50 constexpr int32_t CHANNEL_NAME_LENGTH = 48;
51 constexpr int32_t VIDEO_FRAME_RATE = 30;
52 constexpr int32_t DEFAULT_APP_UID = 0;
53 constexpr int32_t DEFAULT_APP_PID = 0;
54 constexpr int32_t DEFAULT_INSTANCE_ID = 0;
55 constexpr int32_t HEX_WIDTH = 2;
56 constexpr char FILL_CHAR = '0';
57 constexpr int32_t WAIT_FOR_CONNECT = 11;
58 }
59
AbilityConnectionSession(int32_t sessionId,std::string serverSocketName,AbilityConnectionSessionInfo sessionInfo,ConnectOption opt)60 AbilityConnectionSession::AbilityConnectionSession(int32_t sessionId, std::string serverSocketName,
61 AbilityConnectionSessionInfo sessionInfo, ConnectOption opt)
62 {
63 sessionId_ = sessionId;
64 localSocketName_ = serverSocketName;
65 sessionInfo_ = sessionInfo;
66 connectOption_ = opt;
67 version_ = DSCHED_COLLAB_PROTOCOL_VERSION;
68 InitMessageHandlerMap();
69 }
70
~AbilityConnectionSession()71 AbilityConnectionSession::~AbilityConnectionSession()
72 {
73 }
74
InitMessageHandlerMap()75 void AbilityConnectionSession::InitMessageHandlerMap()
76 {
77 messageHandlerMap_[static_cast<uint32_t>(MessageType::NORMAL)] =
78 [this](const std::string& msg) { ExeuteMessageEventCallback(msg); };
79 messageHandlerMap_[static_cast<uint32_t>(MessageType::WIFI_OPEN)] =
80 [this](const std::string&) { ConnectStreamChannel(); };
81 messageHandlerMap_[static_cast<uint32_t>(MessageType::UPDATE_RECV_ENGINE_CHANNEL)] =
82 [this](const std::string&) { UpdateRecvEngineTransChannel(); };
83 messageHandlerMap_[static_cast<uint32_t>(MessageType::UPDATE_SENDER_ENGINE_CHANNEL)] =
84 [this](const std::string&) { UpdateSenderEngineTransChannel(); };
85 messageHandlerMap_[static_cast<uint32_t>(MessageType::CONNECT_FILE_CHANNEL)] =
86 [this](const std::string& msg) { ConnectFileChannel(msg); };
87 messageHandlerMap_[static_cast<uint32_t>(MessageType::FILE_CHANNEL_CONNECT_SUCCESS)] =
88 [this](const std::string&) { NotifyAppConnectResult(true); };
89 messageHandlerMap_[static_cast<uint32_t>(MessageType::FILE_CHANNEL_CONNECT_FAILED)] =
90 [this](const std::string&) { NotifyAppConnectResult(false); };
91 messageHandlerMap_[static_cast<uint32_t>(MessageType::SESSION_CONNECT_SUCCESS)] =
92 [this](const std::string&) { HandleSessionConnect(); };
93 messageHandlerMap_[static_cast<uint32_t>(MessageType::RECEIVE_STREAM_START)] =
94 [this](const std::string&) { UpdateRecvEngineStatus(); };
95 }
96
Init()97 void AbilityConnectionSession::Init()
98 {
99 HILOGI("Init AbilityConnectionSession start");
100 if (eventHandler_ != nullptr) {
101 HILOGI("AbilityConnectionSession already inited, end.");
102 return;
103 }
104 eventThread_ = std::thread(&AbilityConnectionSession::StartEvent, this);
105 std::unique_lock<std::mutex> lock(eventMutex_);
106 eventCon_.wait(lock, [this] {
107 return eventHandler_ != nullptr;
108 });
109 HILOGI("Init AbilityConnectionSession end");
110 }
111
StartEvent()112 void AbilityConnectionSession::StartEvent()
113 {
114 HILOGI("StartEvent start");
115 std::string name = localSocketName_ + std::to_string(sessionId_);
116 prctl(PR_SET_NAME, name.c_str());
117 auto runner = AppExecFwk::EventRunner::Create(false);
118 {
119 std::lock_guard<std::mutex> lock(eventMutex_);
120 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
121 }
122 eventCon_.notify_one();
123 runner->Run();
124 HILOGI("StartEvent end");
125 }
126
UnInit()127 void AbilityConnectionSession::UnInit()
128 {
129 HILOGI("UnInit start");
130 std::unique_lock<std::mutex> lock(eventMutex_);
131 if (eventHandler_ != nullptr) {
132 eventHandler_->GetEventRunner()->Stop();
133 if (eventThread_.joinable()) {
134 eventThread_.join();
135 }
136 eventHandler_ = nullptr;
137 } else {
138 HILOGE("eventHandler_ is nullptr");
139 }
140 HILOGI("UnInit end");
141 }
142
Release()143 void AbilityConnectionSession::Release()
144 {
145 HILOGI("called.");
146 {
147 std::unique_lock<std::shared_mutex> sessionStatusWriteLock(sessionMutex_);
148 if (sessionStatus_ == SessionStatus::UNCONNECTED) {
149 HILOGI("The session resource has been released.");
150 return;
151 }
152 sessionStatus_ = SessionStatus::UNCONNECTED;
153 }
154 AbilityConnectionManager::GetInstance().DeleteConnectSession(sessionInfo_, sessionId_);
155 DestroyStream();
156
157 std::unique_lock<std::shared_mutex> channelLock(transChannelMutex_);
158 for (auto iter = transChannels_.begin(); iter != transChannels_.end(); iter++) {
159 ChannelManager::GetInstance().DeleteChannel(iter->second.channelId);
160 }
161 transChannels_.clear();
162 }
163
GetPeerInfo()164 PeerInfo AbilityConnectionSession::GetPeerInfo()
165 {
166 return sessionInfo_.peerInfo_;
167 }
168
GetLocalInfo()169 PeerInfo AbilityConnectionSession::GetLocalInfo()
170 {
171 return sessionInfo_.localInfo_;
172 }
173
GetServerToken()174 std::string AbilityConnectionSession::GetServerToken()
175 {
176 return dmsServerToken_;
177 }
178
HandlePeerVersion(int32_t version)179 int32_t AbilityConnectionSession::HandlePeerVersion(int32_t version)
180 {
181 HILOGI("called.");
182 DistributedClient dmsClient;
183 int32_t ret = dmsClient.CollabMission(sessionId_,
184 localSocketName_, sessionInfo_,
185 connectOption_, dmsServerToken_);
186 if (ret != ERR_OK) {
187 HILOGE("collab mission start failed.");
188 ConnectResult connectResult(false, ConnectErrorCode::SYSTEM_INTERNAL_ERROR, "");
189 ExeuteConnectCallback(connectResult);
190 }
191 return ret;
192 }
193
Connect(ConnectCallback & callback)194 int32_t AbilityConnectionSession::Connect(ConnectCallback& callback)
195 {
196 HILOGI("called.");
197 connectCallback_ = callback;
198 if (CheckConnectedSession()) {
199 HILOGE("connected session %{public}d exists.", sessionId_);
200 return CONNECTED_SESSION_EXISTS;
201 }
202 if (!CheckWifiStatus()) {
203 HILOGI("Wi-Fi is not enabled.");
204 ConnectResult connectResult(false, ConnectErrorCode::LOCAL_WIFI_NOT_OPEN, "");
205 ExeuteConnectCallback(connectResult);
206 return LOCAL_WIFI_NOT_OPEN;
207 }
208
209 {
210 std::unique_lock<std::shared_mutex> sessionStatusWriteLock(sessionMutex_);
211 if (sessionStatus_ != SessionStatus::UNCONNECTED) {
212 HILOGE("session has start to connect, sessionStatus is %{public}d", sessionStatus_);
213 ConnectResult connectResult(false, ConnectErrorCode::CONNECTED_SESSION_EXISTS, "");
214 ExeuteConnectCallback(connectResult);
215 return CONNECTED_SESSION_EXISTS;
216 }
217 sessionStatus_ = SessionStatus::CONNECTING;
218 }
219 direction_ = CollabrateDirection::COLLABRATE_SOURCE;
220 dmsServerToken_ = CreateDmsServerToken();
221 DistributedClient dmsClient;
222 int32_t ret = dmsClient.GetPeerVersion(sessionId_, sessionInfo_.peerInfo_.deviceId, dmsServerToken_);
223 if (ret != ERR_OK) {
224 HILOGE("collab mission start failed.");
225 ConnectResult connectResult(false, ConnectErrorCode::SYSTEM_INTERNAL_ERROR, "");
226 ExeuteConnectCallback(connectResult);
227 }
228 return ret;
229 }
230
CheckConnectedSession()231 bool AbilityConnectionSession::CheckConnectedSession()
232 {
233 if (IsConnected()) {
234 HILOGE("session %{public}d connected", sessionId_);
235 ConnectResult connectResult(false, ConnectErrorCode::CONNECTED_SESSION_EXISTS, "");
236 ExeuteConnectCallback(connectResult);
237 return true;
238 }
239
240 int32_t ret = AbilityConnectionManager::GetInstance().UpdateClientSession(sessionInfo_, sessionId_);
241 if (ret != ERR_OK) {
242 HILOGE("connected session exists.");
243 ConnectResult connectResult(false, ConnectErrorCode::CONNECTED_SESSION_EXISTS, "");
244 ExeuteConnectCallback(connectResult);
245 return true;
246 }
247 return false;
248 }
249
CheckWifiStatus()250 bool AbilityConnectionSession::CheckWifiStatus()
251 {
252 uint64_t tokenId = OHOS::IPCSkeleton::GetSelfTokenID();
253 if (OHOS::Security::AccessToken::TokenIdKit::IsSystemAppByFullTokenID(tokenId)) {
254 HILOGI("The current application is a system app.");
255 return true;
256 }
257
258 DistributedClient dmsClient;
259 return dmsClient.GetWifiStatus();
260 }
261
CreateDmsServerToken()262 std::string AbilityConnectionSession::CreateDmsServerToken()
263 {
264 auto pid = getprocpid();
265 auto now = std::chrono::system_clock::now();
266 auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
267 std::string input = std::to_string(pid) + std::to_string(sessionId_) + std::to_string(timestamp);
268 unsigned char hash[SHA256_DIGEST_LENGTH];
269 SHA256((const unsigned char*)input.c_str(), input.length(), hash);
270 std::stringstream hashStr;
271 for (int i = 0; i < SHA256_DIGEST_LENGTH; ++i) {
272 hashStr << std::hex << std::setw(HEX_WIDTH) << std::setfill(FILL_CHAR) << (int)hash[i];
273 }
274 return hashStr.str().substr(0, CHANNEL_NAME_LENGTH);
275 }
276
Disconnect()277 int32_t AbilityConnectionSession::Disconnect()
278 {
279 HILOGI("called.");
280 Release();
281 DistributedClient dmsClient;
282 int32_t ret = dmsClient.NotifyCloseCollabSession(dmsServerToken_);
283 HILOGI("Notify Server DisConnect result is %{public}d", ret);
284 return ERR_OK;
285 }
286
AcceptConnect(const std::string & token)287 int32_t AbilityConnectionSession::AcceptConnect(const std::string& token)
288 {
289 HILOGI("called.");
290 DistributedClient dmsClient;
291 if (!CheckWifiStatus()) {
292 HILOGI("Wi-Fi is not enabled.");
293 dmsClient.NotifyPrepareResult(token, PEER_WIFI_NOT_OPEN, sessionId_, localSocketName_);
294 return PEER_WIFI_NOT_OPEN;
295 }
296 {
297 std::unique_lock<std::shared_mutex> sessionStatusWriteLock(sessionMutex_);
298 if (sessionStatus_ != SessionStatus::UNCONNECTED) {
299 HILOGE("session has start to connect, sessionStatus is %{public}d", sessionStatus_);
300 return INVALID_PARAMETERS_ERR;
301 }
302 sessionStatus_ = SessionStatus::CONNECTING;
303 }
304 dmsServerToken_ = token;
305 int32_t ret = AbilityConnectionManager::GetInstance().UpdateServerSession(sessionInfo_, sessionId_);
306 if (ret != ERR_OK) {
307 dmsClient.NotifyPrepareResult(token, ret, sessionId_, localSocketName_);
308 Release();
309 return INVALID_PARAMETERS_ERR;
310 }
311
312 direction_ = CollabrateDirection::COLLABRATE_SINK;
313 ret = InitChannels();
314 if (ret != ERR_OK) {
315 HILOGE("init sink client failed!");
316 dmsClient.NotifyPrepareResult(token, ret, sessionId_, localSocketName_);
317 Release();
318 return ret;
319 }
320 ret = dmsClient.NotifyPrepareResult(token, ERR_OK, sessionId_, localSocketName_);
321 if (ret != ERR_OK) {
322 HILOGE("notify prepare result failed!");
323 Release();
324 return ret;
325 }
326 return ERR_OK;
327 }
328
HandleCollabResult(int32_t result,const std::string & peerSocketName,const std::string & dmsServerToken,const std::string & reason)329 int32_t AbilityConnectionSession::HandleCollabResult(int32_t result, const std::string& peerSocketName,
330 const std::string& dmsServerToken, const std::string& reason)
331 {
332 HILOGI("called.");
333 dmsServerToken_ = dmsServerToken;
334 peerSocketName_ = peerSocketName;
335 if (result != ERR_OK) {
336 HILOGE("collab result is failed, ret = %{public}d, reason = %{public}s", result, reason.c_str());
337 NotifyCollabErrorResultFromSa(false, ConvertToConnectErrorCode(result), reason);
338 return INVALID_PARAMETERS_ERR;
339 }
340
341 if (InitChannels() != ERR_OK || ConnectChannels() != ERR_OK) {
342 NotifyAppConnectResult(false);
343 return INVALID_PARAMETERS_ERR;
344 }
345
346 if (connectOption_.needReceiveFile) {
347 return RequestReceiveFileChannelConnection();
348 }
349
350 NotifyPeerSessionConnected();
351 NotifyAppConnectResult(true);
352 return ERR_OK;
353 }
354
ConvertToConnectErrorCode(int32_t collabResult)355 ConnectErrorCode AbilityConnectionSession::ConvertToConnectErrorCode(int32_t collabResult)
356 {
357 HILOGI("Collaboration failed code is %{public}d.", collabResult);
358 switch (collabResult) {
359 case CONNECTED_SESSION_EXISTS:
360 return ConnectErrorCode::CONNECTED_SESSION_EXISTS;
361 case SAME_SESSION_IS_CONNECTING:
362 return ConnectErrorCode::CONNECTED_SESSION_EXISTS;
363 case PEER_WIFI_NOT_OPEN:
364 return ConnectErrorCode::PEER_WIFI_NOT_OPEN;
365 case DistributedSchedule::COLLAB_ABILITY_REJECT_ERR:
366 return ConnectErrorCode::PEER_APP_REJECTED;
367 case PEER_ABILITY_NO_ONCOLLABORATE:
368 return ConnectErrorCode::PEER_ABILITY_NO_ONCOLLABORATE;
369 default:
370 return ConnectErrorCode::SYSTEM_INTERNAL_ERROR;
371 }
372 }
373
RequestReceiveFileChannelConnection()374 int32_t AbilityConnectionSession::RequestReceiveFileChannelConnection()
375 {
376 HILOGI("notify the peer end bind file.");
377 int32_t ret = SendMessage(localSocketName_, MessageType::CONNECT_FILE_CHANNEL);
378 if (ret != ERR_OK) {
379 HILOGE("Failed to notify the file channel connection, ret = %{public}d", ret);
380 NotifyAppConnectResult(false);
381 }
382 return ret;
383 }
384
NotifyPeerSessionConnected()385 void AbilityConnectionSession::NotifyPeerSessionConnected()
386 {
387 if (!connectOption_.HasFileTransfer()) {
388 HILOGI("No notification required.");
389 return;
390 }
391
392 HILOGI("notify the peer end bind file.");
393 int32_t ret = SendMessage("SESSION_CONNECT_SUCCESS", MessageType::SESSION_CONNECT_SUCCESS);
394 if (ret != ERR_OK) {
395 HILOGE("Failed to notify the session connection result, ret = %{public}d", ret);
396 }
397 }
398
NotifyAppConnectResult(bool isConnected,const ConnectErrorCode errorCode,const std::string & reason)399 void AbilityConnectionSession::NotifyAppConnectResult(bool isConnected,
400 const ConnectErrorCode errorCode, const std::string& reason)
401 {
402 HILOGE("notify result from self %{public}d", sessionId_);
403 ConnectResult connectResult(isConnected, errorCode, reason);
404 if (isConnected) {
405 connectResult.sessionId = sessionId_;
406 } else {
407 Release();
408 DistributedClient dmsClient;
409 dmsClient.NotifyCloseCollabSession(dmsServerToken_);
410 }
411 ExeuteConnectCallback(connectResult);
412 }
413
NotifyCollabErrorResultFromSa(bool isConnected,const ConnectErrorCode errorCode,const std::string & reason)414 void AbilityConnectionSession::NotifyCollabErrorResultFromSa(bool isConnected,
415 const ConnectErrorCode errorCode, const std::string& reason)
416 {
417 HILOGE("notify result from sa %{public}d", sessionId_);
418 ConnectResult connectResult(isConnected, errorCode, reason);
419 connectResult.sessionId = sessionId_;
420 Release();
421 ExeuteConnectCallback(connectResult);
422 }
423
HandleDisconnect()424 int32_t AbilityConnectionSession::HandleDisconnect()
425 {
426 HILOGI("called.");
427 {
428 std::shared_lock<std::shared_mutex> sessionStatusReadLock(sessionMutex_);
429 if (sessionStatus_ == SessionStatus::UNCONNECTED) {
430 HILOGI("already disconnect");
431 return ERR_OK;
432 }
433 }
434 Release();
435 std::shared_ptr<IAbilityConnectionSessionListener> listener;
436 {
437 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
438 listener = sessionListener_;
439 }
440 if (listener) {
441 HILOGI("handler sessionListener");
442 listener->OnDisConnect(sessionId_);
443 } else {
444 EventCallbackInfo callbackInfo;
445 callbackInfo.sessionId = sessionId_;
446 callbackInfo.reason = DisconnectReason::PEER_APP_EXIT;
447 ExeuteEventCallback(EVENT_DISCONNECT, callbackInfo);
448 }
449 return ERR_OK;
450 }
451
SendMessage(const std::string & msg,const MessageType & messageType)452 int32_t AbilityConnectionSession::SendMessage(const std::string& msg, const MessageType& messageType)
453 {
454 HILOGI("called.");
455 auto sendData = std::make_shared<AVTransDataBuffer>(msg.length() + 1);
456 int32_t ret = memcpy_s(sendData->Data(), sendData->Capacity(), msg.c_str(), msg.length());
457 if (ret != ERR_OK) {
458 HILOGE("memory copy failed, ret %{public}d", ret);
459 return ret;
460 }
461
462 uint32_t totalLen = sendData->Size() + MessageDataHeader::HEADER_LEN;
463
464 MessageDataHeader headerPara(PROTOCOL_VERSION, static_cast<uint32_t>(messageType), totalLen);
465 auto headerBuffer = headerPara.Serialize();
466 auto sendBuffer = std::make_shared<AVTransDataBuffer>(totalLen);
467 uint8_t* header = sendBuffer->Data();
468
469 if (memcpy_s(header, sendBuffer->Size(), headerBuffer->Data(), MessageDataHeader::HEADER_LEN) != ERR_OK) {
470 HILOGE("Write header failed");
471 return WRITE_SESSION_HEADER_FAILED;
472 }
473
474 ret = memcpy_s(header + MessageDataHeader::HEADER_LEN, sendBuffer->Size() - MessageDataHeader::HEADER_LEN,
475 sendData->Data(), sendData->Size());
476 if (ret != ERR_OK) {
477 HILOGE("Write data failed");
478 return WRITE_SEND_DATA_BUFFER_FAILED;
479 }
480
481 TransChannelInfo transChannelInfo;
482 ret = GetTransChannelInfo(TransChannelType::MESSAGE, transChannelInfo);
483 if (ret != ERR_OK) {
484 HILOGE("message channel not exit");
485 return ret;
486 }
487
488 int32_t channelId = transChannelInfo.channelId;
489 ret = ChannelManager::GetInstance().SendMessage(channelId, sendBuffer);
490 if (ret != ERR_OK) {
491 HILOGE("send message failed, channelId is %{public}d", channelId);
492 return ret;
493 }
494 return ERR_OK;
495 }
496
SendData(const std::shared_ptr<AVTransDataBuffer> & buffer)497 int32_t AbilityConnectionSession::SendData(const std::shared_ptr<AVTransDataBuffer>& buffer)
498 {
499 HILOGI("called.");
500 TransChannelInfo transChannelInfo;
501 int32_t ret = GetTransChannelInfo(TransChannelType::DATA, transChannelInfo);
502 if (ret != ERR_OK) {
503 HILOGE("data channel not exit");
504 return ret;
505 }
506
507 int32_t channelId = transChannelInfo.channelId;
508 ret = ChannelManager::GetInstance().SendBytes(channelId, buffer);
509 if (ret != ERR_OK) {
510 HILOGE("send bytes failed, channelId is %{public}d", channelId);
511 return ret;
512 }
513 return ERR_OK;
514 }
515
SendImage(const std::shared_ptr<Media::PixelMap> & image,int32_t imageQuality)516 int32_t AbilityConnectionSession::SendImage(const std::shared_ptr<Media::PixelMap>& image, int32_t imageQuality)
517 {
518 HILOGI("called.");
519 if (senderEngine_ == nullptr) {
520 HILOGE("senderEngine_ is nullptr.");
521 return INVALID_PARAMETERS_ERR;
522 }
523 int32_t ret = senderEngine_->SendPixelMap(image, imageQuality);
524 if (ret != ERR_OK) {
525 HILOGE("Send image failed, ret is %{public}d.", ret);
526 return ret;
527 }
528 return ERR_OK;
529 }
530
SendFile(const std::vector<std::string> & sFiles,const std::vector<std::string> & dFiles)531 int32_t AbilityConnectionSession::SendFile(const std::vector<std::string>& sFiles,
532 const std::vector<std::string>& dFiles)
533 {
534 HILOGI("called.");
535 TransChannelInfo transChannelInfo;
536 int32_t ret = GetTransChannelInfo(TransChannelType::SEND_FILE, transChannelInfo);
537 if (ret != ERR_OK) {
538 HILOGE("data channel not exit");
539 return ret;
540 }
541
542 int32_t channelId = transChannelInfo.channelId;
543 ret = ChannelManager::GetInstance().SendFile(channelId, sFiles, dFiles);
544 if (ret != ERR_OK) {
545 HILOGE("send bytes failed, channelId is %{public}d", channelId);
546 return ret;
547 }
548 return ERR_OK;
549 }
550
CreateStream(int32_t streamId,const StreamParams & param)551 int32_t AbilityConnectionSession::CreateStream(int32_t streamId, const StreamParams& param)
552 {
553 HILOGI("called. StreamParams role is %{public}d", static_cast<int32_t>(param.role));
554 streamParam_ = param;
555 switch (param.role) {
556 case StreamRole::SOURCE:
557 streamId_ = streamId;
558 return InitSenderEngine();
559 case StreamRole::SINK:
560 streamId_ = streamId;
561 return InitRecvEngine();
562 default:
563 HILOGE("Unrecognized streamRole.");
564 return INVALID_PARAMETERS_ERR;
565 }
566 }
567
InitSenderEngine()568 int32_t AbilityConnectionSession::InitSenderEngine()
569 {
570 if (!connectOption_.needSendStream) {
571 HILOGE("The stream sending option is not configured.");
572 return INVALID_PARAMETERS_ERR;
573 }
574 std::unique_lock<std::shared_mutex> listenerWriteLock(engineMutex_);
575 if (senderEngine_ != nullptr) {
576 HILOGE("The stream sender engine has init.");
577 return ONLY_SUPPORT_ONE_STREAM;
578 }
579 senderEngine_ = std::make_shared<AVSenderEngine>(DEFAULT_APP_UID, DEFAULT_APP_PID,
580 sessionInfo_.localInfo_.bundleName, DEFAULT_INSTANCE_ID);
581 senderEngine_->Init();
582 return ERR_OK;
583 }
584
InitRecvEngine()585 int32_t AbilityConnectionSession::InitRecvEngine()
586 {
587 if (!connectOption_.needReceiveStream) {
588 HILOGE("The stream receive option is not configured.");
589 return INVALID_PARAMETERS_ERR;
590 }
591 std::unique_lock<std::shared_mutex> listenerWriteLock(engineMutex_);
592 if (recvEngine_ != nullptr) {
593 HILOGE("The stream receive has init.");
594 return ONLY_SUPPORT_ONE_STREAM;
595 }
596
597 recvEngine_ = std::make_shared<AVReceiverEngine>();
598 recvEngine_->Init();
599 return ERR_OK;
600 }
601
DestroyStream()602 int32_t AbilityConnectionSession::DestroyStream()
603 {
604 HILOGI("called.");
605 senderEngine_ = nullptr;
606 recvEngine_ = nullptr;
607 recvEngineState_ = EngineState::EMPTY;
608 TransChannelInfo info;
609 std::shared_lock<std::shared_mutex> channelReadLock(transChannelMutex_);
610 auto item = transChannels_.find(TransChannelType::STREAM);
611 if (item != transChannels_.end() && item->second.isConnected) {
612 info = item->second;
613 }
614 ChannelManager::GetInstance().ClearSendTask(info.channelId);
615 HILOGI("stream bytes channel clear task");
616 item = transChannels_.find(TransChannelType::STREAM_BYTES);
617 if (item != transChannels_.end() && item->second.isConnected) {
618 info = item->second;
619 }
620 ChannelManager::GetInstance().ClearSendTask(info.channelId);
621 HILOGI("stream bytes channel clear task");
622 return ERR_OK;
623 }
624
625 template <typename T>
ConfigEngineParam(std::shared_ptr<T> & engine,const SurfaceParams & param)626 int32_t AbilityConnectionSession::ConfigEngineParam(std::shared_ptr<T> &engine, const SurfaceParams& param)
627 {
628 engine->SetVideoSource(static_cast<VideoSourceType>(param.format));
629
630 int32_t ret = ERR_OK;
631 ret = engine->Configure(VidEnc(VideoCodecFormat::H264));
632 if (ret != ERR_OK) {
633 HILOGE("configure videnc failed");
634 return ret;
635 }
636 ret = engine->Configure(VidRectangle(param.width, param.height));
637 if (ret != ERR_OK) {
638 HILOGE("configure VidRectangle failed");
639 return ret;
640 }
641 ret = engine->Configure(VidBitRate(streamParam_.bitrate));
642 if (ret != ERR_OK) {
643 HILOGE("configure VidBitRate failed");
644 return ret;
645 }
646 ret = engine->Configure(VidFrameRate(VIDEO_FRAME_RATE));
647 if (ret != ERR_OK) {
648 HILOGE("configure VidFrameRate failed");
649 return ret;
650 }
651 ret = engine->Configure(VidIsHdr(false));
652 if (ret != ERR_OK) {
653 HILOGE("configure VidIsHdr failed");
654 return ret;
655 }
656 ret = engine->Configure(VidEnableTemporalScale(false));
657 if (ret != ERR_OK) {
658 HILOGE("configure VidEnableTemporalScale failed");
659 return ret;
660 }
661 ret = engine->Configure(VidSurfaceParam(ConvertToSurfaceParam(param)));
662 if (ret != ERR_OK) {
663 HILOGE("configure VidSurfaceParam failed");
664 return ret;
665 }
666 return ERR_OK;
667 }
668
GetSurfaceId(const SurfaceParams & param,std::string & surfaceId)669 int32_t AbilityConnectionSession::GetSurfaceId(const SurfaceParams& param, std::string& surfaceId)
670 {
671 HILOGI("called.");
672 if (senderEngine_ == nullptr) {
673 HILOGE("senderEngine_ Uninitialized.");
674 return INVALID_PARAMETERS_ERR;
675 }
676 int32_t ret = ConfigEngineParam(senderEngine_, param);
677 if (ret != ERR_OK) {
678 HILOGE("config senderEngine param failed.");
679 return ret;
680 }
681
682 ret = senderEngine_->Prepare();
683 if (ret != ERR_OK) {
684 HILOGE("error prepare senderEngine_");
685 return ret;
686 }
687
688 TransChannelInfo info;
689 if (GetStreamTransChannel(info) != ERR_OK) {
690 HILOGE("senderEngine_ SetChannelListener failed");
691 return INVALID_PARAMETERS_ERR;
692 }
693
694 HILOGI("SetChannelListener channelId is %{public}d, channelType is %{public}d",
695 info.channelId, static_cast<int32_t>(info.channelType));
696 senderEngine_->SetTransChannel(info.channelId, info.channelType);
697 surfaceId = std::to_string(senderEngine_->GetSurface());
698 return ERR_OK;
699 }
700
SetSurfaceId(const std::string & surfaceId,const SurfaceParams & param)701 int32_t AbilityConnectionSession::SetSurfaceId(const std::string& surfaceId,
702 const SurfaceParams& param)
703 {
704 HILOGI("called.");
705 if (recvEngine_ == nullptr) {
706 HILOGE("recvEngine_ Uninitialized.");
707 return INVALID_PARAMETERS_ERR;
708 }
709 int32_t ret = ConfigEngineParam(recvEngine_, param);
710 if (ret != ERR_OK) {
711 HILOGE("recvEngine_ ConfigEngineParam failed.");
712 return ret;
713 }
714
715 uint64_t value = 0;
716 auto result = std::from_chars(surfaceId.data(), surfaceId.data() + surfaceId.size(), value);
717 if (result.ec != std::errc()) {
718 HILOGE("Get value failed");
719 return INVALID_PARAMETERS_ERR;
720 }
721 ret = recvEngine_->SetVideoSurface(value);
722 if (ret != ERR_OK) {
723 HILOGE("error set video surface!");
724 return ret;
725 }
726
727 ret = recvEngine_->Prepare();
728 if (ret != ERR_OK) {
729 HILOGE("error prepare recvEngine_");
730 return ret;
731 }
732
733 TransChannelInfo info;
734 if (GetStreamTransChannel(info) != ERR_OK) {
735 HILOGE("recvEngine_ SetChannelListener failed");
736 return INVALID_PARAMETERS_ERR;
737 }
738 HILOGE("SetChannelListener channelId is %{public}d, channelType is %{public}d",
739 info.channelId, static_cast<int32_t>(info.channelType));
740 recvEngine_->SetChannelListener(info.channelId);
741 pixelMapListener = std::make_shared<PixelMapListener>(shared_from_this());
742 recvEngine_->SetEngineListener(pixelMapListener);
743 return ERR_OK;
744 }
745
UpdateSurfaceParam(const SurfaceParams & surfaceParam)746 int32_t AbilityConnectionSession::UpdateSurfaceParam(const SurfaceParams& surfaceParam)
747 {
748 SurfaceParam param = ConvertToSurfaceParam(surfaceParam);
749 HILOGI("SurfaceParam rotate is %{public}d, filp is %{public}d.",
750 static_cast<int32_t>(param.rotate), static_cast<int32_t>(param.filp));
751 if (senderEngine_ != nullptr) {
752 HILOGI("Update senderEngine_ SurfaceParam.");
753 senderEngine_->SetSurfaceParam(param);
754 return ERR_OK;
755 }
756
757 if (recvEngine_ != nullptr) {
758 HILOGI("Update recvEngine_ SurfaceParam.");
759 recvEngine_ -> OnRecvSurfaceParam(param);
760 return ERR_OK;
761 }
762
763 HILOGE("senderEngine_ and recvEngine_ is nullptr!");
764 return INVALID_PARAMETERS_ERR;
765 }
766
ConvertToSurfaceParam(const SurfaceParams & param)767 SurfaceParam AbilityConnectionSession::ConvertToSurfaceParam(const SurfaceParams& param)
768 {
769 SurfaceParam surfaveParam;
770 switch (param.rotation) {
771 case SURFACE_ROTATE_NONE:
772 surfaveParam.rotate = SurfaceRotate::ROTATE_NONE;
773 break;
774 case SURFACE_ROTATE_90:
775 surfaveParam.rotate = SurfaceRotate::ROTATE_90;
776 break;
777 case SURFACE_ROTATE_180:
778 surfaveParam.rotate = SurfaceRotate::ROTATE_180;
779 break;
780 case SURFACE_ROTATE_270:
781 surfaveParam.rotate = SurfaceRotate::ROTATE_270;
782 break;
783 default:
784 surfaveParam.rotate = SurfaceRotate::ROTATE_NONE;
785 break;
786 }
787
788 switch (param.flip) {
789 case FlipOptions::HORIZONTAL:
790 surfaveParam.filp = SurfaceFilp::FLIP_H;
791 break;
792 case FlipOptions::VERTICAL:
793 surfaveParam.filp = SurfaceFilp::FLIP_V;
794 break;
795 default:
796 surfaveParam.filp = SurfaceFilp::FLIP_NONE;
797 break;
798 }
799
800 return surfaveParam;
801 }
802
GetStreamTransChannel(TransChannelInfo & info)803 int32_t AbilityConnectionSession::GetStreamTransChannel(TransChannelInfo& info)
804 {
805 std::shared_lock<std::shared_mutex> channelReadLock(transChannelMutex_);
806 auto item = transChannels_.find(TransChannelType::STREAM);
807 if (item != transChannels_.end() && item->second.isConnected) {
808 info = item->second;
809 return ERR_OK;
810 }
811 HILOGW("stream channel unconnected");
812 item = transChannels_.find(TransChannelType::STREAM_BYTES);
813 if (item != transChannels_.end() && item->second.isConnected) {
814 info = item->second;
815 return ERR_OK;
816 }
817 HILOGE("bytes stream channel unconnected");
818 return INVALID_PARAMETERS_ERR;
819 }
820
StartStream(int32_t streamId)821 int32_t AbilityConnectionSession::StartStream(int32_t streamId)
822 {
823 HILOGI("called.");
824 if (connectOption_.needSendStream && senderEngine_ != nullptr) {
825 return StartSenderEngine();
826 }
827
828 if (connectOption_.needReceiveStream && recvEngine_ != nullptr) {
829 return StartRecvEngine();
830 }
831 HILOGE("not config stream option or engine is null.");
832 return INVALID_PARAMETERS_ERR;
833 }
834
StartRecvEngine()835 int32_t AbilityConnectionSession::StartRecvEngine()
836 {
837 HILOGI("recvEngine_ Start.");
838 int32_t ret = recvEngine_->Start();
839 if (ret != ERR_OK) {
840 HILOGE("recvEngine_ start failed.");
841 return ret;
842 }
843 return SendMessage("recvEngineStart", MessageType::RECEIVE_STREAM_START);
844 }
845
UpdateRecvEngineStatus()846 void AbilityConnectionSession::UpdateRecvEngineStatus()
847 {
848 recvEngineState_ = EngineState::START;
849 }
850
StartSenderEngine()851 int32_t AbilityConnectionSession::StartSenderEngine()
852 {
853 HILOGI("senderEngine_ Start. recvEngineState_ is %{public}d", static_cast<int32_t>(recvEngineState_));
854 if (recvEngineState_ != EngineState::START) {
855 HILOGE("recvEngine not start");
856 return RECEIVE_STREAM_NOT_START;
857 }
858 return senderEngine_->Start();
859 }
860
StopStream(int32_t streamId)861 int32_t AbilityConnectionSession::StopStream(int32_t streamId)
862 {
863 HILOGI("called.");
864 if (connectOption_.needSendStream && senderEngine_ != nullptr) {
865 HILOGI("senderEngine_ Stop.");
866 return senderEngine_->Stop();
867 }
868
869 if (connectOption_.needReceiveStream && recvEngine_ != nullptr) {
870 HILOGI("recvEngine_ Stop.");
871 return recvEngine_->Stop();
872 }
873 return ERR_OK;
874 }
875
RegisterEventCallback(const std::string & eventType,const std::shared_ptr<JsAbilityConnectionSessionListener> & listener)876 int32_t AbilityConnectionSession::RegisterEventCallback(const std::string& eventType,
877 const std::shared_ptr<JsAbilityConnectionSessionListener>& listener)
878 {
879 HILOGI("called.");
880 std::unique_lock<std::shared_mutex> listenerWriteLock(listenerMutex_);
881 listeners_[eventType] = listener;
882 return ERR_OK;
883 }
884
UnregisterEventCallback(const std::string & eventType)885 int32_t AbilityConnectionSession::UnregisterEventCallback(const std::string& eventType)
886 {
887 HILOGI("called.");
888 std::unique_lock<std::shared_mutex> listenerWriteLock(listenerMutex_);
889 listeners_.erase(eventType);
890 return ERR_OK;
891 }
892
RegisterEventCallback(const std::shared_ptr<IAbilityConnectionSessionListener> & listener)893 int32_t AbilityConnectionSession::RegisterEventCallback(
894 const std::shared_ptr<IAbilityConnectionSessionListener>& listener)
895 {
896 if (listener == nullptr) {
897 HILOGE("listener empty");
898 return INVALID_LISTENER;
899 }
900 std::unique_lock<std::shared_mutex> lock(sessionListenerMutex_);
901 sessionListener_ = listener;
902 return ERR_OK;
903 }
904
UnregisterEventCallback()905 int32_t AbilityConnectionSession::UnregisterEventCallback()
906 {
907 std::unique_lock<std::shared_mutex> lock(sessionListenerMutex_);
908 sessionListener_ = nullptr;
909 return ERR_OK;
910 }
911
ExeuteEventCallback(const std::string & eventType,EventCallbackInfo & info)912 int32_t AbilityConnectionSession::ExeuteEventCallback(const std::string& eventType, EventCallbackInfo& info)
913 {
914 return ExeuteEventCallbackTemplate(eventType, info);
915 }
916
ExeuteEventCallback(const std::string & eventType,CollaborateEventInfo & info)917 int32_t AbilityConnectionSession::ExeuteEventCallback(const std::string& eventType, CollaborateEventInfo& info)
918 {
919 return ExeuteEventCallbackTemplate(eventType, info);
920 }
921
922 template <typename T>
ExeuteEventCallbackTemplate(const std::string & eventType,T & info)923 int32_t AbilityConnectionSession::ExeuteEventCallbackTemplate(const std::string& eventType, T& info)
924 {
925 HILOGI("called, eventType is %{public}s", eventType.c_str());
926 std::shared_lock<std::shared_mutex> listenerReadLock(listenerMutex_);
927 if (listeners_.empty()) {
928 HILOGE("listeners_ is empty");
929 return INVALID_PARAMETERS_ERR;
930 }
931
932 auto item = listeners_.find(eventType);
933 if (item == listeners_.end()) {
934 HILOGE("The event callback is not registered. event is %{public}s", eventType.c_str());
935 return INVALID_PARAMETERS_ERR;
936 }
937
938 if constexpr (std::is_same_v<T, EventCallbackInfo>) {
939 info.eventType = eventType;
940 }
941 auto eventCallback = item->second;
942 if (eventCallback == nullptr) {
943 HILOGE("eventCallback is nullptr");
944 return INVALID_PARAMETERS_ERR;
945 }
946 eventCallback->CallJsMethod(info);
947 return ERR_OK;
948 }
949
InitChannels()950 int32_t AbilityConnectionSession::InitChannels()
951 {
952 HILOGI("called.");
953 channelName_ = GetChannelName(sessionInfo_);
954 channelListener_ = std::make_shared<CollabChannelListener>(shared_from_this());
955 bool isClientChannel = direction_ == CollabrateDirection::COLLABRATE_SOURCE;
956 int32_t ret = CreateChannel(channelName_, ChannelDataType::MESSAGE, TransChannelType::MESSAGE, isClientChannel);
957 if (ret != ERR_OK) {
958 HILOGE("create message channel failed!");
959 return FAILED_TO_CREATE_MESSAGE_CHANNEL;
960 }
961
962 if (connectOption_.needSendData &&
963 CreateChannel(channelName_, ChannelDataType::BYTES, TransChannelType::DATA, isClientChannel) != ERR_OK) {
964 HILOGE("create data channel failed!");
965 return FAILED_TO_CREATE_DATA_CHANNEL;
966 }
967
968 if ((connectOption_.needSendStream || connectOption_.needReceiveStream) &&
969 CreateStreamChannel(channelName_, isClientChannel) != ERR_OK) {
970 HILOGE("create stream channel failed!");
971 return FAILED_TO_CREATE_STREAM_CHANNEL;
972 }
973
974 if (connectOption_.needSendFile && isClientChannel &&
975 CreateChannel(channelName_, ChannelDataType::FILE, TransChannelType::SEND_FILE, true) != ERR_OK) {
976 HILOGE("create send file channel failed!");
977 return FAILED_TO_CREATE_SEND_FILE_CHANNEL;
978 }
979
980 if (connectOption_.needReceiveFile &&
981 CreateChannel(channelName_, ChannelDataType::FILE, TransChannelType::RECEIVE_FILE, false) != ERR_OK) {
982 HILOGE("create receive file channel failed!");
983 return FAILED_TO_CREATE_RECEIVE_FILE_CHANNEL;
984 }
985 return ERR_OK;
986 }
987
CreateStreamChannel(const std::string & channelName,bool isClientChannel)988 int32_t AbilityConnectionSession::CreateStreamChannel(const std::string& channelName, bool isClientChannel)
989 {
990 std::string streamChannelName = channelName + "stream";
991 int32_t ret = CreateChannel(streamChannelName, ChannelDataType::BYTES, TransChannelType::STREAM_BYTES,
992 isClientChannel);
993 if (ret != ERR_OK) {
994 HILOGE("init bytes channel failed!");
995 return INVALID_PARAMETERS_ERR;
996 }
997
998 ret = CreateChannel(streamChannelName, ChannelDataType::VIDEO_STREAM, TransChannelType::STREAM, isClientChannel);
999 if (ret != ERR_OK) {
1000 HILOGE("init bytes channel failed!");
1001 return INVALID_PARAMETERS_ERR;
1002 }
1003 return ERR_OK;
1004 }
1005
CreateChannel(const std::string & channelName,const ChannelDataType & dataType,const TransChannelType & channelType,bool isClientChannel)1006 int32_t AbilityConnectionSession::CreateChannel(const std::string& channelName, const ChannelDataType& dataType,
1007 const TransChannelType& channelType, bool isClientChannel)
1008 {
1009 HILOGI("called.");
1010 ChannelPeerInfo channelPeerInfo = { peerSocketName_, sessionInfo_.peerInfo_.deviceId };
1011 ChannelManager &channelManager = ChannelManager::GetInstance();
1012 int32_t channelId = isClientChannel ?
1013 channelManager.CreateClientChannel(channelName, dataType, channelPeerInfo) :
1014 channelManager.CreateServerChannel(channelName, dataType, channelPeerInfo);
1015 if (!channelManager.isValidChannelId(channelId)) {
1016 HILOGE("CreateChannel failed, channelId is %{public}d", channelId);
1017 return INVALID_PARAMETERS_ERR;
1018 }
1019
1020 if (channelManager.RegisterChannelListener(channelId, channelListener_) != ERR_OK) {
1021 HILOGE("register channel listener failed, channelId is %{public}d", channelId);
1022 return INVALID_PARAMETERS_ERR;
1023 }
1024
1025 std::unique_lock<std::shared_mutex> channelWriteLock(transChannelMutex_);
1026 TransChannelInfo channelInfo = {channelId, dataType, channelType, false};
1027 transChannels_.emplace(channelType, channelInfo);
1028 return ERR_OK;
1029 }
1030
GetChannelName(const AbilityConnectionSessionInfo & sessionInfo)1031 std::string AbilityConnectionSession::GetChannelName(const AbilityConnectionSessionInfo& sessionInfo)
1032 {
1033 PeerInfo localInfo = sessionInfo.localInfo_;
1034 PeerInfo peerInfo = sessionInfo.peerInfo_;
1035 bool isClientChannel = direction_ == CollabrateDirection::COLLABRATE_SOURCE;
1036 std::string input = isClientChannel ?
1037 localInfo.moduleName + localInfo.abilityName + peerInfo.moduleName + peerInfo.abilityName :
1038 peerInfo.moduleName + peerInfo.abilityName + localInfo.moduleName + localInfo.abilityName;
1039
1040 unsigned char hash[SHA256_DIGEST_LENGTH];
1041 SHA256((const unsigned char*)input.c_str(), input.length(), hash);
1042
1043 std::stringstream hashStr;
1044 for (int i = 0; i < SHA256_DIGEST_LENGTH; ++i) {
1045 hashStr << std::hex << std::setw(HEX_WIDTH) << std::setfill(FILL_CHAR) << (int)hash[i];
1046 }
1047
1048 std::string channelName = hashStr.str().substr(0, CHANNEL_NAME_LENGTH);
1049 return channelName;
1050 }
1051
ConnectChannels()1052 int32_t AbilityConnectionSession::ConnectChannels()
1053 {
1054 HILOGI("parallel connect channels");
1055 std::vector<std::future<int32_t>> futures;
1056 // message
1057 futures.emplace_back(std::async(std::launch::async, [this]() -> int32_t {
1058 if (ConnectTransChannel(TransChannelType::MESSAGE) != ERR_OK) {
1059 HILOGE("connect message channel failed.");
1060 return CONNECT_MESSAGE_CHANNEL_FAILED;
1061 }
1062 return ERR_OK;
1063 }));
1064 // data
1065 if (connectOption_.needSendData) {
1066 futures.emplace_back(std::async(std::launch::async, [this]() -> int32_t {
1067 if (ConnectTransChannel(TransChannelType::DATA) != ERR_OK) {
1068 HILOGE("connect data channel failed.");
1069 return CONNECT_DATA_CHANNEL_FAILED;
1070 }
1071 return ERR_OK;
1072 }));
1073 }
1074 // stream
1075 if (connectOption_.needSendStream || connectOption_.needReceiveStream) {
1076 ConnectStreamChannel();
1077 futures.emplace_back(std::async(std::launch::async, [this]() -> int32_t {
1078 if (ConnectTransChannel(TransChannelType::STREAM_BYTES) != ERR_OK) {
1079 HILOGE("connect stream channel failed.");
1080 return CONNECT_STREAM_CHANNEL_FAILED;
1081 }
1082 return ERR_OK;
1083 }));
1084 }
1085 // file
1086 if (connectOption_.needSendFile && ConnectTransChannel(TransChannelType::SEND_FILE) != ERR_OK) {
1087 HILOGE("connect send file channel failed.");
1088 return CONNECT_SEND_FILE_CHANNEL_FAILED;
1089 }
1090 // wait for task
1091 for (auto&& future : futures) {
1092 int32_t result = future.get();
1093 if (result != ERR_OK) {
1094 return result;
1095 }
1096 }
1097 return ERR_OK;
1098 }
1099
ConnectTransChannel(const TransChannelType channelType)1100 int32_t AbilityConnectionSession::ConnectTransChannel(const TransChannelType channelType)
1101 {
1102 TransChannelInfo info;
1103 int32_t ret = GetTransChannelInfo(channelType, info);
1104 if (ret != ERR_OK) {
1105 HILOGE("stream channel not exits!");
1106 return STREAM_CHANNEL_NOT_EXITS;
1107 }
1108
1109 ret = ChannelManager::GetInstance().ConnectChannel(info.channelId);
1110 if (ret != ERR_OK) {
1111 HILOGE("connect channel failed. ret is %{public}d", ret);
1112 return ret;
1113 }
1114 HILOGI("connect channel success, channel type is %{public}d", static_cast<int32_t>(channelType));
1115 UpdateTransChannelStatus(info.channelId, true);
1116 return ERR_OK;
1117 }
1118
ConnectStreamChannel()1119 int32_t AbilityConnectionSession::ConnectStreamChannel()
1120 {
1121 HILOGI("called.");
1122 if (!connectOption_.needSendStream && !connectOption_.needReceiveStream) {
1123 HILOGI("Streaming is not required.");
1124 return ERR_OK;
1125 }
1126
1127 TransChannelInfo info;
1128 int32_t ret = GetTransChannelInfo(TransChannelType::STREAM, info);
1129 if (ret != ERR_OK) {
1130 HILOGE("stream channel not exits!");
1131 return INVALID_PARAMETERS_ERR;
1132 }
1133
1134 if (info.isConnected) {
1135 HILOGE("stream channel has connected.");
1136 return ERR_OK;
1137 }
1138
1139 if (direction_ != CollabrateDirection::COLLABRATE_SOURCE) {
1140 HILOGI("notify source connect stream channel.");
1141 SendMessage("WIFI_OPEN", MessageType::WIFI_OPEN);
1142 return ERR_OK;
1143 }
1144
1145 std::thread task([this, info]() {
1146 DoConnectStreamChannel(info.channelId);
1147 });
1148 task.detach();
1149 return ERR_OK;
1150 }
1151
DoConnectStreamChannel(int32_t channelId)1152 int32_t AbilityConnectionSession::DoConnectStreamChannel(int32_t channelId)
1153 {
1154 HILOGI("called.");
1155 int32_t ret = ChannelManager::GetInstance().ConnectChannel(channelId);
1156 if (ret != ERR_OK) {
1157 HILOGE("stream channel bind failed, ret is %{public}d", ret);
1158 return ret;
1159 }
1160 HILOGI("stream channel bind success");
1161 UpdateTransChannelStatus(channelId, true);
1162
1163 if (recvEngine_ == nullptr) {
1164 HILOGI("notify peer update recvEngine channel.");
1165 SendMessage("updateRecvEngineTransChannel", MessageType::UPDATE_RECV_ENGINE_CHANNEL);
1166 return ERR_OK;
1167 }
1168 UpdateRecvEngineTransChannel();
1169 return ERR_OK;
1170 }
1171
GetTransChannelInfo(const TransChannelType & type,TransChannelInfo & info)1172 int32_t AbilityConnectionSession::GetTransChannelInfo(const TransChannelType& type, TransChannelInfo& info)
1173 {
1174 HILOGD("called.");
1175 std::shared_lock<std::shared_mutex> channelReadLock(transChannelMutex_);
1176 auto item = transChannels_.find(type);
1177 if (item == transChannels_.end()) {
1178 return INVALID_PARAMETERS_ERR;
1179 }
1180 info = item->second;
1181 return ERR_OK;
1182 }
1183
OnChannelConnect(int32_t channelId)1184 void AbilityConnectionSession::OnChannelConnect(int32_t channelId)
1185 {
1186 HILOGI("called. channelId is %{public}d", channelId);
1187 if (!IsVaildChannel(channelId)) {
1188 HILOGE("is vaild channelId");
1189 return;
1190 }
1191
1192 UpdateTransChannelStatus(channelId, true);
1193 if (IsAllChannelConnected() && !connectOption_.HasFileTransfer()) {
1194 HandleSessionConnect();
1195 }
1196 }
1197
HandleSessionConnect()1198 void AbilityConnectionSession::HandleSessionConnect()
1199 {
1200 HILOGI("called.");
1201 std::unique_lock<std::shared_mutex> sessionStatusWriteLock(sessionMutex_);
1202 if (sessionStatus_ == SessionStatus::CONNECTED) {
1203 HILOGI("session has connected.");
1204 return;
1205 }
1206 sessionStatus_ = SessionStatus::CONNECTED;
1207
1208 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1209 {
1210 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1211 listener = sessionListener_;
1212 }
1213 if (listener) {
1214 HILOGI("handler sessionListener");
1215 listener->OnConnect(sessionId_);
1216 } else {
1217 EventCallbackInfo callbackInfo;
1218 callbackInfo.sessionId = sessionId_;
1219 ExeuteEventCallback(EVENT_CONNECT, callbackInfo);
1220 }
1221 }
1222
UpdateTransChannelStatus(int32_t channelId,bool isConnected)1223 void AbilityConnectionSession::UpdateTransChannelStatus(int32_t channelId, bool isConnected)
1224 {
1225 std::unique_lock<std::shared_mutex> channelWriteLock(transChannelMutex_);
1226 for (auto& iter : transChannels_) {
1227 if (iter.second.channelId == channelId) {
1228 HILOGI("transType is %{public}d.", static_cast<int32_t>(iter.second.transType));
1229 iter.second.isConnected = isConnected;
1230 }
1231 }
1232 }
1233
IsAllChannelConnected()1234 bool AbilityConnectionSession::IsAllChannelConnected()
1235 {
1236 HILOGD("called.");
1237 std::shared_lock<std::shared_mutex> channelReadLock(transChannelMutex_);
1238 for (auto& iter : transChannels_) {
1239 TransChannelInfo info = iter.second;
1240 if (!info.isConnected && info.transType != TransChannelType::STREAM) {
1241 HILOGI("transType is %{public}d.", static_cast<int32_t>(info.transType));
1242 return false;
1243 }
1244 }
1245 HILOGI("AllChannelConnected.");
1246 return true;
1247 }
1248
OnChannelClosed(int32_t channelId,const ShutdownReason & reason)1249 void AbilityConnectionSession::OnChannelClosed(int32_t channelId, const ShutdownReason& reason)
1250 {
1251 HILOGI("called. channelId is %{public}d", channelId);
1252 if (!IsVaildChannel(channelId)) {
1253 HILOGE("is vaild channelId");
1254 return;
1255 }
1256
1257 if (!IsConnected()) {
1258 HILOGE("session is not connected.");
1259 return;
1260 }
1261
1262 HILOGI("notidy app disconnect");
1263 Disconnect();
1264
1265 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1266 {
1267 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1268 listener = sessionListener_;
1269 }
1270 if (listener) {
1271 HILOGI("handler sessionListener");
1272 listener->OnDisConnect(sessionId_);
1273 } else {
1274 EventCallbackInfo callbackInfo;
1275 callbackInfo.sessionId = sessionId_;
1276 callbackInfo.reason = ConvertToDisconnectReason(reason);
1277 ExeuteEventCallback(EVENT_DISCONNECT, callbackInfo);
1278 }
1279 }
1280
ConvertToDisconnectReason(const ShutdownReason & reason)1281 DisconnectReason AbilityConnectionSession::ConvertToDisconnectReason(const ShutdownReason& reason)
1282 {
1283 HILOGI("Shutdown reason code is %{public}d.", static_cast<int32_t>(reason));
1284 switch (reason) {
1285 case ShutdownReason::SHUTDOWN_REASON_PEER:
1286 return DisconnectReason::PEER_APP_EXIT;
1287 default:
1288 return DisconnectReason::NETWORK_DISCONNECTED;
1289 }
1290 }
1291
OnMessageReceived(int32_t channelId,const std::shared_ptr<AVTransDataBuffer> dataBuffer)1292 void AbilityConnectionSession::OnMessageReceived(int32_t channelId, const std::shared_ptr<AVTransDataBuffer> dataBuffer)
1293 {
1294 HILOGI("called.");
1295 if (!IsVaildChannel(channelId)) {
1296 return;
1297 }
1298 uint8_t *data = dataBuffer->Data();
1299 auto headerPara = MessageDataHeader::Deserialize(data, dataBuffer->Size());
1300 if (!headerPara) {
1301 HILOGE("read session header from buffer failed");
1302 return;
1303 }
1304 std::string msg(reinterpret_cast<const char *>(data + MessageDataHeader::HEADER_LEN),
1305 dataBuffer->Size() - MessageDataHeader::HEADER_LEN);
1306 HILOGI("headerPara type is %{public}d", headerPara->dataType_);
1307 // common handler
1308 auto handler = [this, msg](uint32_t dataType) {
1309 auto iter = messageHandlerMap_.find(dataType);
1310 if (iter != messageHandlerMap_.end()) {
1311 iter->second(msg);
1312 } else {
1313 HILOGE("unhandled code!");
1314 }
1315 };
1316
1317 if (headerPara->dataType_ == static_cast<uint32_t>(MessageType::CONNECT_FILE_CHANNEL)) {
1318 TransChannelInfo info;
1319 int32_t ret = GetTransChannelInfo(TransChannelType::SEND_FILE, info);
1320 if (ret != ERR_OK) {
1321 HILOGI("send file channel now not exists!");
1322 std::thread(handler, headerPara->dataType_).detach(); // only connect file need async
1323 }
1324 } else {
1325 handler(headerPara->dataType_); // keep sync
1326 }
1327 }
1328
OnSendFile(const int32_t channelId,const FileInfo & info)1329 void AbilityConnectionSession::OnSendFile(const int32_t channelId, const FileInfo& info)
1330 {
1331 HILOGI("called.");
1332 if (!IsVaildChannel(channelId)) {
1333 return;
1334 }
1335
1336 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1337 {
1338 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1339 listener = sessionListener_;
1340 }
1341 if (listener == nullptr) {
1342 HILOGE("listener is nullptr");
1343 return;
1344 }
1345 listener->OnSendFile(sessionId_, info);
1346 }
1347
OnRecvFile(const int32_t channelId,const FileInfo & info)1348 void AbilityConnectionSession::OnRecvFile(const int32_t channelId, const FileInfo& info)
1349 {
1350 HILOGI("called.");
1351 if (!IsVaildChannel(channelId)) {
1352 return;
1353 }
1354
1355 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1356 {
1357 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1358 listener = sessionListener_;
1359 }
1360 if (listener == nullptr) {
1361 HILOGE("listener is nullptr");
1362 return;
1363 }
1364 listener->OnRecvFile(sessionId_, info);
1365 }
1366
GetRecvPath(const int32_t channelId)1367 const char* AbilityConnectionSession::GetRecvPath(const int32_t channelId)
1368 {
1369 HILOGI("called.");
1370 if (!IsVaildChannel(channelId)) {
1371 return nullptr;
1372 }
1373
1374 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1375 {
1376 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1377 listener = sessionListener_;
1378 }
1379 if (listener == nullptr) {
1380 HILOGE("listener is nullptr");
1381 return nullptr;
1382 }
1383 return listener->GetRecvPath(sessionId_);
1384 }
1385
ExeuteMessageEventCallback(const std::string msg)1386 void AbilityConnectionSession::ExeuteMessageEventCallback(const std::string msg)
1387 {
1388 HILOGI("called.");
1389 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1390 {
1391 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1392 listener = sessionListener_;
1393 }
1394 // bi-channel need wait
1395 {
1396 std::unique_lock<std::mutex> lock(connectionMutex_);
1397 bool isConnected = connectionCondition_.wait_for(
1398 lock,
1399 std::chrono::seconds(WAIT_FOR_CONNECT),
1400 [this]() { return sessionStatus_ == SessionStatus::CONNECTED; }
1401 );
1402 if (!isConnected) {
1403 HILOGE("Wait for channel connection timed out after %{public}d seconds.", WAIT_FOR_CONNECT);
1404 return;
1405 }
1406 }
1407 HILOGI("start to add msg callback to handler");
1408 if (listener != nullptr) {
1409 HILOGI("handler sessionListener");
1410 listener->OnMessage(sessionId_, msg);
1411 auto func = [listener, msg, this]() {
1412 listener->OnMessage(sessionId_, msg);
1413 };
1414 eventHandler_->PostTask(func, AppExecFwk::EventQueue::Priority::LOW);
1415 } else {
1416 EventCallbackInfo callbackInfo;
1417 callbackInfo.sessionId = sessionId_;
1418 callbackInfo.msg = msg;
1419 auto func = [callbackInfo, this]() mutable {
1420 ExeuteEventCallback(EVENT_RECEIVE_MESSAGE, callbackInfo);
1421 };
1422 eventHandler_->PostTask(func, AppExecFwk::EventQueue::Priority::LOW);
1423 }
1424 }
1425
UpdateRecvEngineTransChannel()1426 void AbilityConnectionSession::UpdateRecvEngineTransChannel()
1427 {
1428 HILOGI("called.");
1429 if (recvEngine_ == nullptr) {
1430 HILOGE("recvEngine_ is nullptr.");
1431 return;
1432 }
1433
1434 TransChannelInfo info;
1435 int32_t ret = GetTransChannelInfo(TransChannelType::STREAM, info);
1436 if (ret != ERR_OK) {
1437 HILOGI("not find stream chennel.");
1438 return;
1439 }
1440 recvEngine_->SetChannelListener(info.channelId);
1441 SendMessage("updateSenderEngineTransChannel", MessageType::UPDATE_SENDER_ENGINE_CHANNEL);
1442 }
1443
UpdateSenderEngineTransChannel()1444 void AbilityConnectionSession::UpdateSenderEngineTransChannel()
1445 {
1446 HILOGI("called.");
1447 if (senderEngine_ == nullptr) {
1448 HILOGE("senderEngine_ is nullptr.");
1449 return;
1450 }
1451
1452 TransChannelInfo info;
1453 int32_t ret = GetTransChannelInfo(TransChannelType::STREAM, info);
1454 if (ret != ERR_OK) {
1455 HILOGI("not find stream chennel.");
1456 return;
1457 }
1458 HILOGI("SetChannelListener channelId is %{public}d, channelType is %{public}d",
1459 info.channelId, static_cast<int32_t>(info.channelType));
1460 senderEngine_->SetTransChannel(info.channelId, info.channelType);
1461 }
1462
ConnectFileChannel(const std::string & peerSocketName)1463 void AbilityConnectionSession::ConnectFileChannel(const std::string& peerSocketName)
1464 {
1465 HILOGI("called.");
1466 peerSocketName_ = peerSocketName;
1467 int32_t ret = CreateChannel(channelName_, ChannelDataType::FILE, TransChannelType::SEND_FILE, true);
1468 if (ret != ERR_OK) {
1469 HILOGE("create send file channel failed!");
1470 return;
1471 }
1472
1473 ret = ConnectTransChannel(TransChannelType::SEND_FILE);
1474 if (ret != ERR_OK) {
1475 HILOGI("connect file chennel failed.");
1476 SendMessage("FILE_CHANNEL_CONNECT_FAILED", MessageType::FILE_CHANNEL_CONNECT_FAILED);
1477 return;
1478 }
1479 SendMessage("FILE_CHANNEL_CONNECT_SUCCESS", MessageType::FILE_CHANNEL_CONNECT_SUCCESS);
1480 HandleSessionConnect();
1481 }
1482
OnRecvPixelMap(const std::shared_ptr<Media::PixelMap> & pixelMap)1483 void AbilityConnectionSession::OnRecvPixelMap(const std::shared_ptr<Media::PixelMap>& pixelMap)
1484 {
1485 HILOGI("called.");
1486 if (pixelMap == nullptr) {
1487 HILOGE("pixelMap is nullptr.");
1488 }
1489
1490 EventCallbackInfo callbackInfo;
1491 callbackInfo.sessionId = sessionId_;
1492 callbackInfo.image = pixelMap;
1493 ExeuteEventCallback(EVENT_RECEIVE_IMAGE, callbackInfo);
1494 }
1495
OnBytesReceived(int32_t channelId,const std::shared_ptr<AVTransDataBuffer> dataBuffer)1496 void AbilityConnectionSession::OnBytesReceived(int32_t channelId, const std::shared_ptr<AVTransDataBuffer> dataBuffer)
1497 {
1498 HILOGI("called.");
1499 if (!IsVaildChannel(channelId)) {
1500 return;
1501 }
1502
1503 if (IsStreamBytesChannel(channelId)) {
1504 HILOGE("is stream bytes channel, no need to send.");
1505 return;
1506 }
1507 // bi-channel need wait
1508 {
1509 std::unique_lock<std::mutex> lock(connectionMutex_);
1510 bool isConnected = connectionCondition_.wait_for(
1511 lock,
1512 std::chrono::seconds(WAIT_FOR_CONNECT),
1513 [this]() { return sessionStatus_ == SessionStatus::CONNECTED; }
1514 );
1515 if (!isConnected) {
1516 HILOGE("Wait for channel connection timed out after %{public}d seconds.", WAIT_FOR_CONNECT);
1517 return;
1518 }
1519 }
1520 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1521 {
1522 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1523 listener = sessionListener_;
1524 }
1525 if (listener != nullptr) {
1526 HILOGI("handler sessionListener");
1527 listener->OnData(sessionId_, dataBuffer);
1528 } else {
1529 EventCallbackInfo callbackInfo;
1530 callbackInfo.sessionId = sessionId_;
1531 callbackInfo.data = dataBuffer;
1532 ExeuteEventCallback(EVENT_RECEIVE_DATA, callbackInfo);
1533 }
1534 }
1535
OnError(int32_t channelId,const int32_t errorCode)1536 void AbilityConnectionSession::OnError(int32_t channelId, const int32_t errorCode)
1537 {
1538 HILOGI("error receive, channelId is %{public}d, errorCode is %{public}d", channelId, errorCode);
1539 if (!IsVaildChannel(channelId)) {
1540 return;
1541 }
1542 std::shared_ptr<IAbilityConnectionSessionListener> listener;
1543 {
1544 std::shared_lock<std::shared_mutex> lock(sessionListenerMutex_);
1545 listener = sessionListener_;
1546 }
1547 if (listener != nullptr) {
1548 HILOGI("handler sessionListener");
1549 listener->OnError(sessionId_, errorCode);
1550 } else {
1551 CollaborateEventInfo info;
1552 info.eventType = CollaborateEventType::SEND_FAILURE;
1553 info.sessionId = sessionId_;
1554 ExeuteEventCallback(EVENT_COLLABORATE, info);
1555 }
1556 }
1557
IsStreamBytesChannel(const int32_t channelId)1558 bool AbilityConnectionSession::IsStreamBytesChannel(const int32_t channelId)
1559 {
1560 TransChannelInfo transChannelInfo;
1561 int32_t ret = GetTransChannelInfo(TransChannelType::STREAM_BYTES, transChannelInfo);
1562 if (ret != ERR_OK) {
1563 HILOGE("stream bytes channel not exit!");
1564 return false;
1565 }
1566
1567 return transChannelInfo.channelId == channelId;
1568 }
1569
IsVaildChannel(const int32_t channelId)1570 bool AbilityConnectionSession::IsVaildChannel(const int32_t channelId)
1571 {
1572 HILOGD("called");
1573 std::shared_lock<std::shared_mutex> channelReadLock(transChannelMutex_);
1574 if (transChannels_.empty()) {
1575 HILOGE("transChannels_ is empty");
1576 return false;
1577 }
1578
1579 for (auto& iter : transChannels_) {
1580 if (iter.second.channelId == channelId) {
1581 return true;
1582 }
1583 }
1584
1585 return false;
1586 }
1587
SetTimeOut(int32_t time)1588 void AbilityConnectionSession::SetTimeOut(int32_t time)
1589 {
1590 HILOGI("called.");
1591 auto func = [this]() {
1592 Release();
1593 };
1594 if (eventHandler_ == nullptr) {
1595 HILOGE("eventHandler_ is nullptr");
1596 return;
1597 }
1598 eventHandler_->PostTask(func, CONNECT_SESSION_TIMEOUT_END_TASK, time);
1599 }
1600
RemoveTimeout()1601 void AbilityConnectionSession::RemoveTimeout()
1602 {
1603 HILOGI("called.");
1604 if (eventHandler_ == nullptr) {
1605 HILOGE("eventHandler_ is nullptr");
1606 return;
1607 }
1608 eventHandler_->RemoveTask(CONNECT_SESSION_TIMEOUT_END_TASK);
1609 }
1610
IsConnecting()1611 bool AbilityConnectionSession::IsConnecting()
1612 {
1613 std::shared_lock<std::shared_mutex> sessionStatusReadLock(sessionMutex_);
1614 return sessionStatus_ == SessionStatus::CONNECTING;
1615 }
1616
IsConnected()1617 bool AbilityConnectionSession::IsConnected()
1618 {
1619 std::shared_lock<std::shared_mutex> sessionStatusReadLock(sessionMutex_);
1620 return sessionStatus_ == SessionStatus::CONNECTED;
1621 }
1622
FinishSessionConnect()1623 void AbilityConnectionSession::FinishSessionConnect()
1624 {
1625 HILOGI("finish %{public}d connect callback", sessionId_);
1626 std::lock_guard<std::shared_mutex> sessionStatusLock(sessionMutex_);
1627 sessionStatus_ = SessionStatus::CONNECTED;
1628 connectionCondition_.notify_all();
1629 }
1630
ExeuteConnectCallback(const ConnectResult & result)1631 void AbilityConnectionSession::ExeuteConnectCallback(const ConnectResult& result)
1632 {
1633 HILOGI("called.");
1634 if (eventHandler_ == nullptr) {
1635 HILOGE("eventHandler_ is nullptr");
1636 return;
1637 }
1638
1639 auto task = [this, result]() {
1640 HILOGI("execute connect callback task.");
1641 if (connectCallback_ == nullptr) {
1642 HILOGE("connect callback is nullptr.");
1643 return;
1644 }
1645 // move ownership and set nullptr
1646 auto callback = std::move(connectCallback_);
1647 connectCallback_ = nullptr;
1648 callback(result);
1649 if (!result.isConnected) {
1650 Release();
1651 }
1652 };
1653 eventHandler_->PostTask(task,
1654 "ExeuteConnectCallback", 0, AppExecFwk::EventQueue::Priority::IMMEDIATE);
1655 }
1656
CollabChannelListener(const std::shared_ptr<AbilityConnectionSession> & abilityConnectionSession)1657 AbilityConnectionSession::CollabChannelListener::CollabChannelListener(
1658 const std::shared_ptr<AbilityConnectionSession>& abilityConnectionSession)
1659 : abilityConnectionSession_(abilityConnectionSession)
1660 {
1661 }
1662
OnConnect(const int32_t channelId) const1663 void AbilityConnectionSession::CollabChannelListener::OnConnect(const int32_t channelId) const
1664 {
1665 HILOGI("called.");
1666 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1667 if (abilityConnectionSession == nullptr) {
1668 HILOGE("abilityConnectionSession is null");
1669 return;
1670 }
1671
1672 abilityConnectionSession->OnChannelConnect(channelId);
1673 }
1674
OnDisConnect(const int32_t channelId,const ShutdownReason & reason) const1675 void AbilityConnectionSession::CollabChannelListener::OnDisConnect(const int32_t channelId,
1676 const ShutdownReason& reason) const
1677 {
1678 HILOGI("called.");
1679 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1680 if (abilityConnectionSession == nullptr) {
1681 HILOGE("abilityConnectionSession is null");
1682 return;
1683 }
1684
1685 abilityConnectionSession->OnChannelClosed(channelId, reason);
1686 }
1687
OnMessage(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer) const1688 void AbilityConnectionSession::CollabChannelListener::OnMessage(const int32_t channelId,
1689 const std::shared_ptr<AVTransDataBuffer>& buffer) const
1690 {
1691 HILOGI("called.");
1692 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1693 if (abilityConnectionSession == nullptr) {
1694 HILOGE("abilityConnectionSession is null");
1695 return;
1696 }
1697
1698 abilityConnectionSession->OnMessageReceived(channelId, buffer);
1699 }
1700
OnBytes(const int32_t channelId,const std::shared_ptr<AVTransDataBuffer> & buffer) const1701 void AbilityConnectionSession::CollabChannelListener::OnBytes(const int32_t channelId,
1702 const std::shared_ptr<AVTransDataBuffer>& buffer) const
1703 {
1704 HILOGI("called.");
1705 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1706 if (abilityConnectionSession == nullptr) {
1707 HILOGE("abilityConnectionSession is null");
1708 return;
1709 }
1710
1711 abilityConnectionSession->OnBytesReceived(channelId, buffer);
1712 }
1713
OnStream(const int32_t channelId,const std::shared_ptr<AVTransStreamData> & sendData) const1714 void AbilityConnectionSession::CollabChannelListener::OnStream(const int32_t channelId,
1715 const std::shared_ptr<AVTransStreamData>& sendData) const
1716 {
1717 }
1718
OnError(const int32_t channelId,const int32_t errorCode) const1719 void AbilityConnectionSession::CollabChannelListener::OnError(const int32_t channelId, const int32_t errorCode) const
1720 {
1721 HILOGI("called.");
1722 if (auto abilityConnectionSession = abilityConnectionSession_.lock()) {
1723 abilityConnectionSession->OnError(channelId, errorCode);
1724 }
1725 }
1726
OnSendFile(const int32_t channelId,const FileInfo & info) const1727 void AbilityConnectionSession::CollabChannelListener::OnSendFile(const int32_t channelId, const FileInfo& info) const
1728 {
1729 HILOGI("called.");
1730 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1731 if (abilityConnectionSession == nullptr) {
1732 HILOGE("abilityConnectionSession is null");
1733 return;
1734 }
1735
1736 abilityConnectionSession->OnSendFile(channelId, info);
1737 }
1738
OnRecvFile(const int32_t channelId,const FileInfo & info) const1739 void AbilityConnectionSession::CollabChannelListener::OnRecvFile(const int32_t channelId, const FileInfo& info) const
1740 {
1741 HILOGI("called.");
1742 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1743 if (abilityConnectionSession == nullptr) {
1744 HILOGE("abilityConnectionSession is null");
1745 return;
1746 }
1747
1748 abilityConnectionSession->OnRecvFile(channelId, info);
1749 }
1750
GetRecvPath(const int32_t channelId) const1751 const char* AbilityConnectionSession::CollabChannelListener::GetRecvPath(const int32_t channelId) const
1752 {
1753 HILOGI("called.");
1754 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1755 if (abilityConnectionSession == nullptr) {
1756 HILOGE("abilityConnectionSession is null");
1757 return nullptr;
1758 }
1759
1760 return abilityConnectionSession->GetRecvPath(channelId);
1761 }
1762
PixelMapListener(const std::shared_ptr<AbilityConnectionSession> & session)1763 AbilityConnectionSession::PixelMapListener::PixelMapListener(
1764 const std::shared_ptr<AbilityConnectionSession>& session) : abilityConnectionSession_(session)
1765 {
1766 }
1767
OnRecvPixelMap(const std::shared_ptr<Media::PixelMap> & pixelMap)1768 void AbilityConnectionSession::PixelMapListener::OnRecvPixelMap(const std::shared_ptr<Media::PixelMap>& pixelMap)
1769 {
1770 HILOGI("called.");
1771 std::shared_ptr<AbilityConnectionSession> abilityConnectionSession = abilityConnectionSession_.lock();
1772 if (abilityConnectionSession == nullptr) {
1773 HILOGE("abilityConnectionSession is null");
1774 return;
1775 }
1776
1777 abilityConnectionSession->OnRecvPixelMap(pixelMap);
1778 }
1779
OnRecvSurfaceParam(const SurfaceParam & param)1780 void AbilityConnectionSession::PixelMapListener::OnRecvSurfaceParam(const SurfaceParam& param)
1781 {
1782 HILOGI("called.");
1783 }
1784 } // namespace DistributedCollab
1785 } // namespace OHOS