1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wfd_sink_session.h"
17 #include <memory>
18 #include "common/common.h"
19 #include "common/common_macro.h"
20 #include "common/reflect_registration.h"
21 #include "common/sharing_log.h"
22 #include "extend/magic_enum/magic_enum.hpp"
23 #include "mediachannel/media_channel_def.h"
24 #include "utils/utils.h"
25 #include "wfd_media_def.h"
26 #include "wfd_message.h"
27
28 namespace OHOS {
29 namespace Sharing {
30 #define WFD_INTERACTION_TIMEOUT 10
31
32 #define WFD_TIMEOUT_5_SECOND 5
33 #define WFD_TIMEOUT_6_SECOND 6
34 #define WFD_KEEP_ALIVE_TIMEOUT_MIN 10
35 #define WFD_KEEP_ALIVE_TIMEOUT_DEFAULT 60
36
WfdSinkSession()37 WfdSinkSession::WfdSinkSession()
38 {
39 SHARING_LOGI("sessionId: %{public}u.", GetId());
40 }
41
~WfdSinkSession()42 WfdSinkSession::~WfdSinkSession()
43 {
44 SHARING_LOGI("sessionId: %{public}u.", GetId());
45
46 if (rtspClient_) {
47 connected_ = false;
48 rtspClient_->Disconnect();
49 rtspClient_.reset();
50 }
51
52 timeoutTimer_.reset();
53 keepAliveTimer_.reset();
54 }
55
HandleEvent(SharingEvent & event)56 int32_t WfdSinkSession::HandleEvent(SharingEvent &event)
57 {
58 SHARING_LOGD("trace.");
59 RETURN_INVALID_IF_NULL(event.eventMsg);
60 if (interrupting_ && status_ == SESSION_INTERRUPT) {
61 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
62 NotifySessionInterrupted();
63 return 0;
64 }
65
66 SHARING_LOGI("eventType: %{public}s, wfd sessionId: %{public}u.",
67 std::string(magic_enum::enum_name(event.eventMsg->type)).c_str(), GetId());
68 switch (event.eventMsg->type) {
69 case EventType::EVENT_SESSION_INIT:
70 HandleSessionInit(event);
71 break;
72 case EventType::EVENT_AGENT_STATE_PROSUMER_INIT:
73 HandleProsumerInitState(event);
74 break;
75 case EventType::EVENT_AGENT_KEYMODE_STOP:
76 case EventType::EVENT_WFD_REQUEST_IDR:
77 SendIDRRequest();
78 break;
79 case EventType::EVENT_SESSION_TEARDOWN:
80 SendM8Request();
81 break;
82 case EventType::EVENT_AGENT_STATE_WRITE_WARNING:
83 NotifyServiceError(ERR_INTAKE_TIMEOUT);
84 break;
85 default:
86 SHARING_LOGI("none process case.");
87 break;
88 }
89
90 if (interrupting_ && status_ == SESSION_INTERRUPT) {
91 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
92 NotifySessionInterrupted();
93 }
94
95 return 0;
96 }
97
HandleSessionInit(SharingEvent & event)98 void WfdSinkSession::HandleSessionInit(SharingEvent &event)
99 {
100 SHARING_LOGD("trace.");
101 RETURN_IF_NULL(event.eventMsg);
102 auto inputMsg = ConvertEventMsg<WfdSinkSessionEventMsg>(event);
103 if (inputMsg) {
104 remoteMac_ = inputMsg->mac;
105 remoteRtspIp_ = inputMsg->ip;
106 remoteRtspPort_ = inputMsg->remotePort != 0 ? inputMsg->remotePort : DEFAULT_WFD_CTRLPORT;
107 localRtpPort_ = inputMsg->localPort;
108
109 videoFormat_ = inputMsg->videoFormat;
110 audioFormat_ = inputMsg->audioFormat;
111 } else {
112 SHARING_LOGE("unknow event msg.");
113 }
114 }
115
HandleProsumerInitState(SharingEvent & event)116 void WfdSinkSession::HandleProsumerInitState(SharingEvent &event)
117 {
118 SHARING_LOGD("trace.");
119 RETURN_IF_NULL(event.eventMsg);
120
121 auto inputMsg = ConvertEventMsg<WfdSinkSessionEventMsg>(event);
122 auto statusMsg = std::make_shared<SessionStatusMsg>();
123 statusMsg->msg = std::make_shared<EventMsg>();
124 statusMsg->status = STATE_SESSION_ERROR;
125 statusMsg->msg->errorCode = ERR_SESSION_START;
126
127 if (inputMsg) {
128 statusMsg->msg->requestId = inputMsg->requestId;
129 if (inputMsg->errorCode == ERR_OK) {
130 statusMsg->status = STATE_SESSION_STARTED;
131 statusMsg->msg->errorCode = ERR_OK;
132 } else {
133 SHARING_LOGE("producer inited failed, producerId: %{public}u.", inputMsg->prosumerId);
134 }
135 } else {
136 SHARING_LOGE("producer inited failed: unknow msg.");
137 }
138
139 NotifyAgentSessionStatus(statusMsg);
140 }
141
UpdateOperation(SessionStatusMsg::Ptr & statusMsg)142 void WfdSinkSession::UpdateOperation(SessionStatusMsg::Ptr &statusMsg)
143 {
144 SHARING_LOGD("trace.");
145 RETURN_IF_NULL(statusMsg);
146 RETURN_IF_NULL(statusMsg->msg);
147
148 status_ = static_cast<SessionRunningStatus>(statusMsg->status);
149 SHARING_LOGI("status: %{public}s.", std::string(magic_enum::enum_name(status_)).c_str());
150 if (interrupting_ && status_ == SESSION_INTERRUPT) {
151 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
152 NotifySessionInterrupted();
153 return;
154 }
155
156 switch (status_) {
157 case SESSION_START: {
158 if (!StartWfdSession()) {
159 SHARING_LOGE("session start connection failed, sessionId: %{public}u.", GetId());
160 statusMsg->msg->errorCode = ERR_CONNECTION_FAILURE;
161 statusMsg->status = STATE_SESSION_ERROR;
162 break;
163 }
164 connected_ = true;
165 if (interrupting_ && status_ == SESSION_INTERRUPT) {
166 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
167 connected_ = false;
168 NotifySessionInterrupted();
169 }
170
171 return;
172 }
173 case SESSION_STOP:
174 statusMsg->status = STATE_SESSION_STOPED;
175 StopWfdSession();
176 break;
177 case SESSION_PAUSE:
178 statusMsg->status = STATE_SESSION_PAUSED;
179 break;
180 case SESSION_RESUME:
181 statusMsg->status = STATE_SESSION_RESUMED;
182 break;
183 case SESSION_DESTROY:
184 statusMsg->status = STATE_SESSION_DESTROYED;
185 break;
186 case SESSION_INTERRUPT:
187 interrupting_ = true;
188 return;
189 default:
190 SHARING_LOGI("none process case.");
191 break;
192 }
193
194 if (interrupting_ && (status_ != SESSION_STOP && status_ != SESSION_DESTROY)) {
195 if (status_ == SESSION_INTERRUPT) {
196 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
197 NotifySessionInterrupted();
198 }
199 SHARING_LOGW("session: %{public}u has been interrupted.", GetId());
200 return;
201 }
202
203 NotifyAgentSessionStatus(statusMsg);
204 }
205
UpdateMediaStatus(SessionStatusMsg::Ptr & statusMsg)206 void WfdSinkSession::UpdateMediaStatus(SessionStatusMsg::Ptr &statusMsg)
207 {
208 SHARING_LOGD("trace.");
209 RETURN_IF_NULL(statusMsg);
210 RETURN_IF_NULL(statusMsg->msg);
211 if (interrupting_ && status_ == SESSION_INTERRUPT) {
212 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
213 NotifySessionInterrupted();
214 return;
215 }
216
217 SHARING_LOGI("update media notify status: %{public}s.",
218 std::string(magic_enum::enum_name(static_cast<MediaNotifyStatus>(statusMsg->status))).c_str());
219 switch (statusMsg->status) {
220 case STATE_PROSUMER_CREATE_SUCCESS:
221 NotifyProsumerInit(statusMsg);
222 break;
223 case STATE_PROSUMER_START_SUCCESS:
224 break;
225 case STATE_PROSUMER_STOP_SUCCESS:
226 break;
227 case STATE_PROSUMER_DESTROY_SUCCESS:
228 break;
229 case STATE_PROSUMER_RESUME_SUCCESS:
230 SendIDRRequest();
231 break;
232 default:
233 SHARING_LOGI("none process case.");
234 break;
235 }
236
237 if (interrupting_ && status_ == SESSION_INTERRUPT) {
238 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
239 NotifySessionInterrupted();
240 }
241 }
242
NotifyProsumerInit(SessionStatusMsg::Ptr & statusMsg)243 void WfdSinkSession::NotifyProsumerInit(SessionStatusMsg::Ptr &statusMsg)
244 {
245 SHARING_LOGD("trace.");
246 RETURN_IF_NULL(statusMsg);
247 RETURN_IF_NULL(statusMsg->msg);
248
249 auto eventMsg = std::make_shared<WfdConsumerEventMsg>();
250 eventMsg->type = EventType::EVENT_WFD_MEDIA_INIT;
251 eventMsg->toMgr = ModuleType::MODULE_MEDIACHANNEL;
252 eventMsg->port = localRtpPort_;
253
254 Common::SetVideoTrack(eventMsg->videoTrack, videoFormat_);
255 Common::SetAudioTrack(eventMsg->audioTrack, audioFormat_);
256
257 statusMsg->msg = std::move(eventMsg);
258 statusMsg->status = NOTIFY_SESSION_PRIVATE_EVENT;
259
260 NotifyAgentSessionStatus(statusMsg);
261 }
262
NotifyAgentSessionStatus(SessionStatusMsg::Ptr & statusMsg)263 void WfdSinkSession::NotifyAgentSessionStatus(SessionStatusMsg::Ptr &statusMsg)
264 {
265 SHARING_LOGD("trace.");
266 RETURN_IF_NULL(statusMsg);
267 if (statusMsg->status == NOTIFY_PROSUMER_CREATE) {
268 statusMsg->className = "WfdRtpConsumer";
269 }
270
271 Notify(statusMsg);
272 }
273
StartWfdSession()274 bool WfdSinkSession::StartWfdSession()
275 {
276 SHARING_LOGD("trace.");
277 if (NetworkFactory::CreateTcpClient(remoteRtspIp_, remoteRtspPort_, shared_from_this(), rtspClient_)) {
278 SHARING_LOGI("sessionId: %{public}u, wfds session connected ip: %{public}s.", GetId(),
279 GetAnonyString(remoteRtspIp_).c_str());
280 } else {
281 // try connect again
282 for (int32_t i = 0; i < 5; i++) { // 5: retry time
283 if (interrupting_) {
284 if (status_ == SESSION_INTERRUPT) {
285 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
286 NotifySessionInterrupted();
287 }
288 SHARING_LOGW("session: %{public}u has been interrupted.", GetId());
289 return false;
290 }
291
292 usleep(1000 * 200); // 1000 * 200: sleep 200ms
293 if (rtspClient_) {
294 if (rtspClient_->Connect(remoteRtspIp_, remoteRtspPort_, "::", 0)) {
295 SHARING_LOGW("sessionId: %{public}u, reconnected successfully, ip: %{public}s, times: %{public}d.",
296 GetId(), GetAnonyString(remoteRtspIp_).c_str(), i);
297 break;
298 } else {
299 SHARING_LOGE("sessionId: %{public}u, Failed to connect wfd rtsp server %{public}s:%{public}d.",
300 GetId(), GetAnonyString(remoteRtspIp_).c_str(), remoteRtspPort_);
301 if (i == 4) { // 4: stop try
302 return false;
303 }
304 }
305 }
306 }
307 }
308
309 timeoutTimer_ = std::make_unique<TimeoutTimer>();
310 timeoutTimer_->SetTimeoutCallback([this]() {
311 if (interrupting_) {
312 if (status_ == SESSION_INTERRUPT) {
313 NotifySessionInterrupted();
314 }
315 } else {
316 NotifyServiceError(ERR_PROTOCOL_INTERACTION_TIMEOUT);
317 }
318 });
319
320 timeoutTimer_->StartTimer(WFD_TIMEOUT_6_SECOND, "Waiting for WFD source M1/OPTIONS request");
321 return true;
322 }
323
StopWfdSession()324 bool WfdSinkSession::StopWfdSession()
325 {
326 SHARING_LOGI("sessionId: %{public}u.", GetId());
327 SendM8Request();
328 connected_ = false;
329 return true;
330 }
331
OnClientReadData(int32_t fd,DataBuffer::Ptr buf)332 void WfdSinkSession::OnClientReadData(int32_t fd, DataBuffer::Ptr buf)
333 {
334 SHARING_LOGD("trace.");
335 RETURN_IF_NULL(buf);
336 if (interrupting_) {
337 if (status_ == SESSION_INTERRUPT) {
338 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
339 NotifySessionInterrupted();
340 }
341 SHARING_LOGW("session: %{public}u has been interrupted.", GetId());
342 return;
343 }
344
345 std::string message(buf->Peek(), buf->Size());
346 RtspResponse response;
347 RtspRequest request;
348
349 SHARING_LOGD("sessionId: %{public}u, Recv WFD source message:\n%{public}s.", GetId(), message.c_str());
350 auto ret = response.Parse(message);
351 if (ret.code == RtspErrorType::OK) {
352 SHARING_LOGD("Recv RTSP Response message.");
353
354 int32_t incommingCSeq = response.GetCSeq();
355 auto funcIndex = responseHandlers_.find(incommingCSeq);
356 if (funcIndex != responseHandlers_.end() && funcIndex->second) {
357 if (!lastMessage_.empty()) {
358 lastMessage_.clear();
359 }
360
361 funcIndex->second(response, message);
362 responseHandlers_.erase(funcIndex);
363 } else {
364 SHARING_LOGE("Can't find response handler for cseq(%{public}d)", incommingCSeq);
365 return;
366 }
367
368 if (ret.info.size() > 2 && ret.info.back() == '$') { // 2: size of int
369 ret.info.pop_back();
370 SHARING_LOGW("*packet splicing need parse again\n%{public}s.", ret.info.c_str());
371 ret = request.Parse(ret.info);
372 if (ret.code == RtspErrorType::OK) {
373 SHARING_LOGD("Recv RTSP Request [method:%{public}s] message.", request.GetMethod().c_str());
374 HandleRequest(request, message);
375 return;
376 }
377 }
378
379 return;
380 }
381
382 ret = request.Parse(message);
383 if (ret.code == RtspErrorType::OK) {
384 SHARING_LOGD("Recv RTSP Request [method:%{public}s] message.", request.GetMethod().c_str());
385 HandleRequest(request, message);
386 return;
387 }
388
389 SHARING_LOGD("invalid WFD rtsp message.");
390 if (ret.code == RtspErrorType::INCOMPLETE_MESSAGE) {
391 lastMessage_ = message;
392 SHARING_LOGD("return with '%{public}s'.", ret.info.c_str());
393 return;
394 } else {
395 message = lastMessage_ + message;
396 ret = request.Parse(message);
397 if (ret.code == RtspErrorType::OK) {
398 SHARING_LOGD("spliced the Request message Successfully.");
399 HandleRequest(request, message);
400 return;
401 } else {
402 lastMessage_.clear();
403 return;
404 }
405 }
406 }
407
OnClientClose(int32_t fd)408 void WfdSinkSession::OnClientClose(int32_t fd)
409 {
410 SHARING_LOGD("sessionId: %{public}u, RTSP TCP Client socket close %{public}d.", GetId(), fd);
411 rtspClient_.reset();
412
413 if (interrupting_) {
414 if (status_ == SESSION_INTERRUPT) {
415 SHARING_LOGE("session: %{public}u, to notify be interrupted.", GetId());
416 NotifySessionInterrupted();
417 }
418 SHARING_LOGW("session: %{public}u has been interrupted.", GetId());
419 return;
420 }
421
422 if (wfdState_ < WfdSessionState::STOPPING) {
423 NotifyServiceError(ERR_NETWORK_ERROR);
424 }
425 }
426
HandleRequest(const RtspRequest & request,const std::string & message)427 void WfdSinkSession::HandleRequest(const RtspRequest &request, const std::string &message)
428 {
429 SHARING_LOGD("trace.");
430 if (!lastMessage_.empty()) {
431 lastMessage_.clear();
432 }
433
434 int32_t incomingCSeq = request.GetCSeq();
435 std::string rtspMethod = request.GetMethod();
436 if (rtspMethod == RTSP_METHOD_OPTIONS) {
437 // this is M1 request
438 SHARING_LOGD("Handle M1 request.");
439 timeoutTimer_->StopTimer();
440 SendM1Response(incomingCSeq);
441 SendM2Request();
442 return;
443 }
444
445 if (rtspMethod == RTSP_METHOD_GET_PARAMETER) {
446 std::list<std::string> params = request.GetBody();
447 if (!params.empty()) {
448 // this is M3 request
449 SHARING_LOGD("Handle M3 request.");
450 timeoutTimer_->StopTimer();
451 SendM3Response(incomingCSeq, params);
452 } else {
453 // M16: Keep-Alive message
454 SHARING_LOGD("Handle M16/Keep-Alive request.");
455 keepAliveTimer_->StopTimer();
456 SendCommonResponse(incomingCSeq);
457 keepAliveTimer_->StartTimer(keepAliveTimeout_,
458 "Waiting for WFD source M16/GET_PARAMETER KeepAlive request");
459 }
460
461 return;
462 }
463
464 if (rtspMethod == RTSP_METHOD_SET_PARAMETER) {
465 // this is M4 or M5 request
466 HandleSetParamRequest(request, message);
467 return;
468 }
469 SHARING_LOGE("RTSP Request [method:%{public}s] message shouldn't be here.", request.GetMethod().c_str());
470 }
471
HandleSetParamRequest(const RtspRequest & request,const std::string & message)472 void WfdSinkSession::HandleSetParamRequest(const RtspRequest &request, const std::string &message)
473 {
474 SHARING_LOGD("trace.");
475 int32_t incomingCSeq = request.GetCSeq();
476
477 std::list<std::string> paramSet = request.GetBody();
478 std::list<std::pair<std::string, std::string>> paramMap;
479 RtspCommon::SplitParameter(paramSet, paramMap);
480 if (paramMap.empty()) {
481 SHARING_LOGE("invalid SET_PARAMETER request without body.");
482 return;
483 }
484
485 // Triger request
486 auto it = std::find_if(paramMap.begin(), paramMap.end(),
487 [](std::pair<std::string, std::string> value) { return value.first == WFD_PARAM_TRIGGER; });
488 if (it != paramMap.end()) {
489 HandleTriggerMethod(incomingCSeq, it->second);
490 return;
491 }
492
493 // M4 request
494 if (timeoutTimer_) {
495 timeoutTimer_->StopTimer();
496 }
497
498 bool ret = HandleM4Request(message);
499 if (ret) {
500 SessionStatusMsg::Ptr statusMsg = std::make_shared<SessionStatusMsg>();
501 statusMsg->msg = std::make_shared<EventMsg>();
502 statusMsg->msg->requestId = 0;
503 statusMsg->msg->errorCode = ERR_OK;
504 statusMsg->status = NOTIFY_PROSUMER_CREATE;
505
506 NotifyAgentSessionStatus(statusMsg);
507 }
508
509 return;
510 }
511
HandleM2Response(const RtspResponse & response,const std::string & message)512 void WfdSinkSession::HandleM2Response(const RtspResponse &response, const std::string &message)
513 {
514 SHARING_LOGD("trace.");
515 if (timeoutTimer_) {
516 timeoutTimer_->StopTimer();
517 timeoutTimer_->StartTimer(WFD_TIMEOUT_6_SECOND, "Waiting for M3/GET_PARAMETER request");
518 }
519
520 if (response.GetStatus() != RTSP_STATUS_OK) {
521 SHARING_LOGE("WFD source peer handle 'OPTIONS' method error.");
522 NotifyServiceError();
523 return;
524 }
525
526 std::string publics = response.GetToken(RTSP_TOKEN_PUBLIC);
527 if (publics.empty() || publics.find(RTSP_METHOD_WFD) == std::string::npos ||
528 publics.find(RTSP_METHOD_SET_PARAMETER) == std::string::npos ||
529 publics.find(RTSP_METHOD_GET_PARAMETER) == std::string::npos ||
530 publics.find(RTSP_METHOD_SETUP) == std::string::npos || publics.find(RTSP_METHOD_PLAY) == std::string::npos ||
531 publics.find(RTSP_METHOD_TEARDOWN) == std::string::npos) {
532 SHARING_LOGE("WFD source peer do not support all methods.");
533 NotifyServiceError();
534 }
535 }
536
HandleM6Response(const RtspResponse & response,const std::string & message)537 void WfdSinkSession::HandleM6Response(const RtspResponse &response, const std::string &message)
538 {
539 SHARING_LOGD("trace.");
540 if (timeoutTimer_) {
541 timeoutTimer_->StopTimer();
542 }
543
544 if (response.GetStatus() != RTSP_STATUS_OK) {
545 SHARING_LOGE("WFD source peer handle 'SETUP' method error.");
546 NotifyServiceError();
547 return;
548 }
549
550 rtspSession_ = response.GetSession();
551 keepAliveTimeout_ = response.GetTimeout();
552 if (keepAliveTimeout_ < WFD_KEEP_ALIVE_TIMEOUT_MIN) {
553 keepAliveTimeout_ = WFD_KEEP_ALIVE_TIMEOUT_DEFAULT;
554 }
555
556 SendM7Request();
557 }
558
HandleM7Response(const RtspResponse & response,const std::string & message)559 void WfdSinkSession::HandleM7Response(const RtspResponse &response, const std::string &message)
560 {
561 SHARING_LOGD("trace.");
562
563 if (timeoutTimer_) {
564 timeoutTimer_->StopTimer();
565 }
566
567 if (response.GetStatus() != RTSP_STATUS_OK) {
568 SHARING_LOGE("WFD source peer handle 'PLAY' method error.");
569 NotifyServiceError();
570 return;
571 }
572
573 wfdState_ = WfdSessionState::PLAYING;
574 SHARING_LOGI("WFD RTSP PLAY ok, start receiver to recv the stream.");
575
576 // Cause the interaction has already been completed,
577 // then set a new callback for the next operations.
578 if (timeoutTimer_) {
579 timeoutTimer_->SetTimeoutCallback([this]() { NotifyServiceError(ERR_NETWORK_ERROR); });
580 }
581
582 keepAliveTimer_ = std::make_unique<TimeoutTimer>();
583 keepAliveTimer_->SetTimeoutCallback([this]() { NotifyServiceError(ERR_NETWORK_ERROR); });
584 keepAliveTimer_->StartTimer(keepAliveTimeout_, "Waiting for WFD source M16/GET_PARAMETER KeepAlive request");
585 }
586
HandleM8Response(const RtspResponse & response,const std::string & message)587 void WfdSinkSession::HandleM8Response(const RtspResponse &response, const std::string &message)
588 {
589 SHARING_LOGD("trace.");
590 if (response.GetStatus() != RTSP_STATUS_OK) {
591 SHARING_LOGE("WFD source peer handle 'TEARDOWN' method error.");
592 NotifyServiceError();
593 return;
594 }
595
596 // notify wfd scene rtsp teardown
597 auto statusMsg = std::make_shared<SessionStatusMsg>();
598 auto eventMsg = std::make_shared<WfdSceneEventMsg>();
599 eventMsg->type = EventType::EVENT_WFD_NOTIFY_RTSP_TEARDOWN;
600 eventMsg->toMgr = ModuleType::MODULE_INTERACTION;
601 eventMsg->mac = remoteMac_;
602 statusMsg->msg = std::move(eventMsg);
603 statusMsg->status = NOTIFY_SESSION_PRIVATE_EVENT;
604
605 NotifyAgentSessionStatus(statusMsg);
606 SHARING_LOGI("WFD RTSP TEARDOWN ok, stop recv the stream, disconnect socket.");
607 }
608
HandleCommonResponse(const RtspResponse & response,const std::string & message)609 void WfdSinkSession::HandleCommonResponse(const RtspResponse &response, const std::string &message)
610 {
611 SHARING_LOGD("trace.");
612
613 if (timeoutTimer_) {
614 timeoutTimer_->StopTimer();
615 }
616
617 if (response.GetStatus() != RTSP_STATUS_OK) {
618 SHARING_LOGI("WFD source peer handle method error.");
619 NotifyServiceError();
620 return;
621 }
622 }
623
SendM1Response(int32_t cseq)624 bool WfdSinkSession::SendM1Response(int32_t cseq)
625 {
626 SHARING_LOGD("trace.");
627 if (!rtspClient_) {
628 return false;
629 }
630
631 WfdRtspM1Response m1Response(cseq, RTSP_STATUS_OK);
632 std::string m1Res(m1Response.Stringify());
633
634 SHARING_LOGI("sessionId: %{public}d send M1 response, cseq: %{public}d.", GetId(), cseq);
635 bool ret = rtspClient_->Send(m1Res.data(), m1Res.length());
636 if (!ret) {
637 SHARING_LOGE("Failed to send M1 response, cseq: %{public}d.", cseq);
638 }
639
640 cseq_ = cseq;
641 return ret;
642 }
643
SendM2Request()644 bool WfdSinkSession::SendM2Request()
645 {
646 SHARING_LOGD("trace.");
647 if (!rtspClient_) {
648 return false;
649 }
650
651 WfdRtspM2Request m2Request(++cseq_);
652
653 responseHandlers_[cseq_] = [this](auto &&PH1, auto &&PH2) {
654 HandleM2Response(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2));
655 };
656 if (timeoutTimer_) {
657 timeoutTimer_->StartTimer(WFD_TIMEOUT_5_SECOND, "Waiting for M2/OPTIONS response");
658 }
659
660 SHARING_LOGI("sessionId: %{public}d send M2 request, cseq: %{public}d.", GetId(), cseq_);
661 std::string m2Req(m2Request.Stringify());
662 bool ret = rtspClient_->Send(m2Req.data(), m2Req.length());
663 if (!ret) {
664 SHARING_LOGE("Failed to send M2 request, cseq: %{public}d", cseq_);
665 responseHandlers_.erase(cseq_);
666 if (timeoutTimer_) {
667 timeoutTimer_->StopTimer();
668 }
669 NotifyServiceError();
670 return false;
671 }
672
673 return true;
674 }
675
SendM3Response(int32_t cseq,std::list<std::string> & params)676 bool WfdSinkSession::SendM3Response(int32_t cseq, std::list<std::string> ¶ms)
677 {
678 SHARING_LOGD("trace.");
679 if (!rtspClient_) {
680 return false;
681 }
682
683 if (params.empty()) {
684 return false;
685 }
686
687 WfdRtspM3Response m3Response(cseq, RTSP_STATUS_OK);
688 for (auto ¶m : params) {
689 if (param == WFD_PARAM_VIDEO_FORMATS) {
690 m3Response.SetVideoFormats(videoFormat_);
691 } else if (param == WFD_PARAM_AUDIO_CODECS) {
692 m3Response.SetAudioCodecs(AUDIO_48000_16_2);
693 } else if (param == WFD_PARAM_RTP_PORTS) {
694 m3Response.SetClientRtpPorts(localRtpPort_);
695 } else if (param == WFD_PARAM_CONTENT_PROTECTION) {
696 m3Response.SetContentProtection();
697 } else if (param == WFD_PARAM_COUPLED_SINK) {
698 m3Response.SetCoupledSink();
699 } else if (param == WFD_PARAM_UIBC_CAPABILITY) {
700 m3Response.SetUibcCapability();
701 } else if (param == WFD_PARAM_STANDBY_RESUME) {
702 m3Response.SetStandbyResumeCapability();
703 } else {
704 m3Response.SetCustomParam(param);
705 }
706 }
707
708 m3Response.SetCustomParam("wfd_connector_type", "5");
709 if (timeoutTimer_) {
710 timeoutTimer_->StartTimer(WFD_TIMEOUT_6_SECOND, "Waiting for M4/SET_PARAMETER request");
711 }
712
713 SHARING_LOGI("sessionId: %{public}d send M3 response, cseq: %{public}d.", GetId(), cseq);
714 std::string m3Req(m3Response.Stringify());
715 bool ret = rtspClient_->Send(m3Req.data(), m3Req.length());
716 if (!ret) {
717 SHARING_LOGE("Failed to send M3 response, cseq: %{public}d", cseq);
718 if (timeoutTimer_) {
719 timeoutTimer_->StopTimer();
720 }
721 NotifyServiceError();
722 return ret;
723 }
724
725 return 0;
726 }
727
HandleM4Request(const std::string & message)728 bool WfdSinkSession::HandleM4Request(const std::string &message)
729 {
730 SHARING_LOGD("trace.");
731
732 WfdRtspM4Request m4Request;
733 m4Request.Parse(message);
734 rtspUrl_ = m4Request.GetPresentationUrl();
735
736 int incomingCSeq = m4Request.GetCSeq();
737 if (timeoutTimer_) {
738 timeoutTimer_->StartTimer(WFD_TIMEOUT_6_SECOND, "Waiting for M5/SET_PARAMETER Triger request");
739 }
740 // Send M4 response
741 if (!SendCommonResponse(incomingCSeq)) {
742 if (timeoutTimer_) {
743 timeoutTimer_->StopTimer();
744 }
745 NotifyServiceError();
746 return false;
747 }
748
749 return true;
750 }
751
SendCommonResponse(int32_t cseq)752 bool WfdSinkSession::SendCommonResponse(int32_t cseq)
753 {
754 SHARING_LOGD("trace.");
755 if (!rtspClient_) {
756 return false;
757 }
758
759 RtspResponse response(cseq, RTSP_STATUS_OK);
760 std::string res(response.Stringify());
761
762 SHARING_LOGI("sessionId: %{public}d send common response, cseq: %{public}d.", GetId(), cseq);
763 bool ret = rtspClient_->Send(res.data(), res.length());
764 if (!ret) {
765 SHARING_LOGE("Failed to send common response.");
766 }
767
768 return ret;
769 }
770
HandleTriggerMethod(int32_t cseq,const std::string & method)771 bool WfdSinkSession::HandleTriggerMethod(int32_t cseq, const std::string &method)
772 {
773 SHARING_LOGD("trace.");
774 if (method == RTSP_METHOD_SETUP) {
775 // this is M5 request
776 if (timeoutTimer_) {
777 timeoutTimer_->StopTimer();
778 }
779 SendCommonResponse(cseq);
780 SendM6Request();
781 } else if (method == RTSP_METHOD_TEARDOWN) {
782 SHARING_LOGW("sessionId: %{public}u, receive Tearndown msg: %{public}s.", GetId(), method.c_str());
783 SendCommonResponse(cseq);
784 SendM8Request();
785 } else {
786 SHARING_LOGE("ignore UNSUPPORTED triggered method '%{public}s'.", method.c_str());
787 }
788
789 return true;
790 }
791
SendM6Request()792 bool WfdSinkSession::SendM6Request()
793 {
794 SHARING_LOGD("trace.");
795 if (!rtspClient_) {
796 return false;
797 }
798
799 WfdRtspM6Request m6Request(++cseq_, rtspUrl_);
800 m6Request.SetClientPort(localRtpPort_);
801
802 responseHandlers_[cseq_] = [this](auto &&PH1, auto &&PH2) {
803 HandleM6Response(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2));
804 };
805 if (timeoutTimer_) {
806 timeoutTimer_->StartTimer(WFD_TIMEOUT_5_SECOND, "Waiting for M6/SETUP response");
807 }
808
809 SHARING_LOGI("sessionId: %{public}d send M6 request, cseq: %{public}d.", GetId(), cseq_);
810 std::string m6Req(m6Request.Stringify());
811 bool ret = rtspClient_->Send(m6Req.data(), m6Req.length());
812 if (!ret) {
813 SHARING_LOGE("Failed to send M6 request, cseq: %{public}d.", cseq_);
814 responseHandlers_.erase(cseq_);
815 if (timeoutTimer_) {
816 timeoutTimer_->StopTimer();
817 }
818
819 NotifyServiceError();
820 return false;
821 }
822
823 wfdState_ = WfdSessionState::READY;
824 return ret;
825 }
826
SendM7Request()827 bool WfdSinkSession::SendM7Request()
828 {
829 SHARING_LOGD("trace.");
830 if (!rtspClient_) {
831 return false;
832 }
833
834 WfdRtspM7Request m7Request(++cseq_, rtspUrl_);
835 if (!rtspSession_.empty()) {
836 m7Request.SetSession(rtspSession_);
837 }
838
839 responseHandlers_[cseq_] = [this](auto &&PH1, auto &&PH2) {
840 HandleM7Response(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2));
841 };
842
843 if (timeoutTimer_) {
844 timeoutTimer_->StartTimer(WFD_TIMEOUT_5_SECOND, "Waiting for M7/PLAY response");
845 }
846 SHARING_LOGI("sessionId: %{public}d send M7 request, cseq: %{public}d.", GetId(), cseq_);
847 std::string m7Req(m7Request.Stringify());
848 bool ret = rtspClient_->Send(m7Req.data(), m7Req.length());
849 if (!ret) {
850 SHARING_LOGE("Failed to send M7 request cseq: %{public}d.", cseq_);
851 responseHandlers_.erase(cseq_);
852 if (timeoutTimer_) {
853 timeoutTimer_->StopTimer();
854 }
855 NotifyServiceError();
856 return false;
857 }
858
859 return ret;
860 }
861
SendM8Request()862 bool WfdSinkSession::SendM8Request()
863 {
864 SHARING_LOGD("trace.");
865 if (wfdState_ == WfdSessionState::STOPPING) {
866 SHARING_LOGI("already send m8 reqeust.");
867 return true;
868 }
869
870 if (!rtspClient_ || !connected_) {
871 SHARING_LOGW("client null or disconnecting.");
872 return false;
873 }
874
875 WfdRtspM8Request m8Request(++cseq_, rtspUrl_);
876 if (!rtspSession_.empty()) {
877 m8Request.SetSession(rtspSession_);
878 }
879
880 responseHandlers_[cseq_] = [this](auto &&PH1, auto &&PH2) {
881 HandleM8Response(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2));
882 };
883
884 SHARING_LOGI("sessionId: %{public}d send M8 request, cseq: %{public}d.", GetId(), cseq_);
885 std::string m8Req(m8Request.Stringify());
886 bool ret = rtspClient_->Send(m8Req.data(), m8Req.length());
887 if (!ret) {
888 SHARING_LOGE("sessionId: %{public}u, failed to send M8 request, cseq: %{public}d.", GetId(), cseq_);
889 responseHandlers_.erase(cseq_);
890 }
891
892 SHARING_LOGI("sessionId: %{public}u,Send TEARDOWN ok.", GetId());
893 wfdState_ = WfdSessionState::STOPPING;
894 return ret;
895 }
896
SendIDRRequest()897 bool WfdSinkSession::SendIDRRequest()
898 {
899 SHARING_LOGD("trace.");
900 if (wfdState_ != WfdSessionState::PLAYING) {
901 return false;
902 }
903
904 if (!rtspClient_) {
905 return false;
906 }
907
908 WfdRtspIDRRequest idrRequest(++cseq_, rtspUrl_);
909 if (!rtspSession_.empty()) {
910 idrRequest.SetSession(rtspSession_);
911 }
912
913 responseHandlers_[cseq_] = [this](auto &&PH1, auto &&PH2) {
914 HandleCommonResponse(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2));
915 };
916 if (timeoutTimer_) {
917 timeoutTimer_->StartTimer(WFD_TIMEOUT_6_SECOND, "Waiting for WFD SET_PARAMETER/wfd_idr_request response");
918 }
919
920 std::string idrReq(idrRequest.Stringify());
921 bool ret = rtspClient_->Send(idrReq.data(), idrReq.length());
922 if (!ret) {
923 SHARING_LOGE("Failed to send IDR request.");
924 }
925
926 return ret;
927 }
928
NotifyServiceError(SharingErrorCode errorCode)929 void WfdSinkSession::NotifyServiceError(SharingErrorCode errorCode)
930 {
931 SHARING_LOGD("trace.");
932 auto statusMsg = std::make_shared<SessionStatusMsg>();
933 statusMsg->msg = std::make_shared<EventMsg>();
934 statusMsg->status = STATE_SESSION_ERROR;
935 statusMsg->msg->requestId = 0;
936 statusMsg->msg->errorCode = errorCode;
937
938 NotifyAgentSessionStatus(statusMsg);
939 }
940
NotifySessionInterrupted()941 void WfdSinkSession::NotifySessionInterrupted()
942 {
943 SHARING_LOGD("trace.");
944 auto statusMsg = std::make_shared<SessionStatusMsg>();
945 statusMsg->status = STATE_SESSION_INTERRUPTED;
946
947 NotifyAgentSessionStatus(statusMsg);
948 }
949
950 REGISTER_CLASS_REFLECTOR(WfdSinkSession);
951 } // namespace Sharing
952 } // namespace OHOS
953