• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "wfd_rtp_consumer.h"
17 #include <chrono>
18 #include "extend/magic_enum/magic_enum.hpp"
19 #include "common/reflect_registration.h"
20 #include "configuration/include/config.h"
21 #include "event_comm.h"
22 #include "protocol/frame/h264_frame.h"
23 #include "wfd_media_def.h"
24 #include "wfd_session_def.h"
25 
26 namespace OHOS {
27 namespace Sharing {
28 
WfdRtpConsumer()29 WfdRtpConsumer::WfdRtpConsumer()
30 {
31     SHARING_LOGI("trace.");
32 }
33 
~WfdRtpConsumer()34 WfdRtpConsumer::~WfdRtpConsumer()
35 {
36     SHARING_LOGI("wfd rtp consumer Id: %{public}u.", GetId());
37     Release();
38 }
39 
HandleEvent(SharingEvent & event)40 int32_t WfdRtpConsumer::HandleEvent(SharingEvent &event)
41 {
42     SHARING_LOGD("trace.");
43     RETURN_INVALID_IF_NULL(event.eventMsg);
44 
45     SHARING_LOGI("eventType: %{public}s, wfd rtp consumerId: %{public}u.",
46                  std::string(magic_enum::enum_name(event.eventMsg->type)).c_str(), GetId());
47     switch (event.eventMsg->type) {
48         case EventType::EVENT_WFD_MEDIA_INIT:
49             HandleProsumerInitState(event);
50             break;
51         default:
52             SHARING_LOGI("none process case.");
53             break;
54     }
55 
56     return 0;
57 }
58 
HandleProsumerInitState(SharingEvent & event)59 void WfdRtpConsumer::HandleProsumerInitState(SharingEvent &event)
60 {
61     SHARING_LOGD("trace.");
62     RETURN_IF_NULL(event.eventMsg);
63 
64     auto msg = ConvertEventMsg<WfdConsumerEventMsg>(event);
65     if (msg) {
66         port_ = msg->port;
67         if (msg->audioTrack.codecId != CodecId::CODEC_NONE) {
68             audioTrack_ = msg->audioTrack;
69         }
70         if (msg->videoTrack.codecId != CodecId::CODEC_NONE) {
71             videoTrack_ = msg->videoTrack;
72         }
73     }
74 
75     isInit_ = true;
76 
77     auto pPrivateMsg = std::make_shared<WfdSinkSessionEventMsg>();
78     pPrivateMsg->errorCode = ERR_OK;
79     pPrivateMsg->type = EVENT_AGENT_STATE_PROSUMER_INIT;
80     pPrivateMsg->toMgr = ModuleType::MODULE_CONTEXT;
81     pPrivateMsg->fromMgr = ModuleType::MODULE_MEDIACHANNEL;
82     pPrivateMsg->srcId = GetId();
83     pPrivateMsg->dstId = event.eventMsg->srcId;
84     contextId_ = event.eventMsg->srcId;
85     pPrivateMsg->agentId = GetSinkAgentId();
86     pPrivateMsg->prosumerId = GetId();
87 
88     NotifyPrivateEvent(pPrivateMsg);
89 }
90 
UpdateOperation(ProsumerStatusMsg::Ptr & statusMsg)91 void WfdRtpConsumer::UpdateOperation(ProsumerStatusMsg::Ptr &statusMsg)
92 {
93     SHARING_LOGD("trace.");
94     RETURN_IF_NULL(statusMsg);
95 
96     SHARING_LOGD("status: %{public}s consumerId: %{public}u.",
97                  std::string(magic_enum::enum_name(static_cast<ProsumerOptRunningStatus>(statusMsg->status))).c_str(),
98                  GetId());
99     switch (statusMsg->status) {
100         case PROSUMER_INIT:
101             if (Init()) {
102                 statusMsg->status = PROSUMER_NOTIFY_INIT_SUCCESS;
103             } else {
104                 statusMsg->status = PROSUMER_NOTIFY_INIT_SUCCESS;
105                 statusMsg->errorCode = ERR_PROSUMER_CREATE;
106             }
107             break;
108         case PROSUMER_START:
109             if (isInit_ && Start()) {
110                 statusMsg->status = PROSUMER_NOTIFY_START_SUCCESS;
111             } else {
112                 statusMsg->status = PROSUMER_NOTIFY_ERROR;
113                 statusMsg->errorCode = ERR_PROSUMER_START;
114             }
115             break;
116         case PROSUMER_PAUSE:
117             isPaused_ = true;
118             mediaTypePaused_ = statusMsg->mediaType;
119             statusMsg->status = PROSUMER_NOTIFY_PAUSE_SUCCESS;
120             break;
121         case PROSUMER_RESUME:
122             isPaused_ = false;
123             statusMsg->status = PROSUMER_NOTIFY_RESUME_SUCCESS;
124             break;
125         case PROSUMER_STOP:
126             Stop();
127             statusMsg->status = PROSUMER_NOTIFY_STOP_SUCCESS;
128             break;
129         case PROSUMER_DESTROY:
130             Release();
131             statusMsg->status = PROSUMER_NOTIFY_DESTROY_SUCCESS;
132             break;
133         default:
134             SHARING_LOGI("none process case.");
135             break;
136     }
137 
138     Notify(statusMsg);
139 }
140 
Init()141 bool WfdRtpConsumer::Init()
142 {
143     SHARING_LOGD("trace.");
144     return InitRtpUnpacker();
145 }
146 
Start()147 bool WfdRtpConsumer::Start()
148 {
149     SHARING_LOGD("trace.");
150     if (!StartNetworkServer(port_, rtpServer_.second, rtpServer_.first)) {
151         SHARING_LOGE("start rtp server port: %{public}d failed.", port_);
152         return false;
153     }
154 
155     SHARING_LOGD("start reveiver server success.");
156     isRunning_ = true;
157     return true;
158 }
159 
Stop()160 bool WfdRtpConsumer::Stop()
161 {
162     SHARING_LOGD("trace.");
163     isRunning_ = false;
164     if (rtpServer_.second) {
165         rtpServer_.second->Stop();
166         rtpServer_.second.reset();
167     }
168 
169     if (rtpUnpacker_) {
170         rtpUnpacker_->Release();
171         rtpUnpacker_.reset();
172     }
173 
174     return true;
175 }
176 
Release()177 int32_t WfdRtpConsumer::Release()
178 {
179     SHARING_LOGD("WFDRtpConsumerId: %{public}u.", GetId());
180     auto end = std::chrono::steady_clock::now();
181     std::chrono::duration<double, std::milli> diff = end - gopInterval_;
182 
183     SHARING_LOGD(
184         "TEST STATISTIC Miracast:finish, interval:%{public}.0f ms, agent ID:%{public}d, "
185         "get video frame, gop:%{public}d, average receiving frames time:%{public}.0f ms.",
186         diff.count(), GetSinkAgentId(), frameNums_, diff.count() / frameNums_);
187 
188     Stop();
189     return 0;
190 }
191 
InitRtpUnpacker()192 bool WfdRtpConsumer::InitRtpUnpacker()
193 {
194     SHARING_LOGD("trace.");
195     RtpPlaylodParam arpp = {33, 90000, RtpPayloadStream::MPEG2_TS}; // 33 : ts rtp payload, 90000 : sampe rate
196     rtpUnpacker_ = RtpFactory::CreateRtpUnpack(arpp);
197     if (rtpUnpacker_ != nullptr) {
198         // data callback
199         rtpUnpacker_->SetOnRtpUnpack(
200             std::bind(&WfdRtpConsumer::OnRtpUnpackCallback, this, std::placeholders::_1, std::placeholders::_2));
201         // notify callback
202         rtpUnpacker_->SetOnRtpNotify(std::bind(&WfdRtpConsumer::OnRtpUnpackNotify, this, std::placeholders::_1));
203     } else {
204         SHARING_LOGE("wfd init rtp unpacker failed.");
205         return false;
206     }
207 
208     return true;
209 }
210 
OnRtpUnpackNotify(int32_t errCode)211 void WfdRtpConsumer::OnRtpUnpackNotify(int32_t errCode)
212 {
213     SHARING_LOGD("errCode: %{public}d.", errCode);
214 }
215 
OnRtpUnpackCallback(uint32_t ssrc,const Frame::Ptr & frame)216 void WfdRtpConsumer::OnRtpUnpackCallback(uint32_t ssrc, const Frame::Ptr &frame)
217 {
218     MEDIA_LOGD("trace.");
219     RETURN_IF_NULL(frame);
220     auto listener = listener_.lock();
221     RETURN_IF_NULL(listener);
222 
223     auto dispatcher = listener->GetDispatcher();
224     RETURN_IF_NULL(dispatcher);
225 
226     MediaData::Ptr mediaData;
227     if (frame->GetTrackType() == TRACK_AUDIO) {
228         if (isPaused_ && (mediaTypePaused_ == MEDIA_TYPE_AUDIO || mediaTypePaused_ == MEDIA_TYPE_AV)) {
229             return;
230         }
231 
232         mediaData = std::make_shared<MediaData>();
233         mediaData->isRaw = false;
234         mediaData->keyFrame = false;
235         mediaData->mediaType = MEDIA_TYPE_AUDIO;
236         mediaData->buff = frame;
237         mediaData->pts = frame->Pts();
238         MEDIA_LOGD("audio & put it into dispatcher: %{public}u, consumerId: %{public}u.", dispatcher->GetDispatcherId(),
239                    GetId());
240 
241         dispatcher->InputData(mediaData);
242     } else if (frame->GetTrackType() == TRACK_VIDEO) {
243         if (isPaused_ && (mediaTypePaused_ == MEDIA_TYPE_VIDEO || mediaTypePaused_ == MEDIA_TYPE_AV)) {
244             return;
245         }
246 
247         auto p = frame->Data();
248         p = *(p + 2) == 0x01 ? p + 3 : p + 4; // 2: fix offset, 3: fix offset, 4: fix offset
249         if (((p[0]) & 0x1f) == 0x01) {
250             mediaData = std::make_shared<MediaData>();
251             mediaData->mediaType = MEDIA_TYPE_VIDEO;
252             mediaData->isRaw = false;
253             mediaData->keyFrame = false;
254             mediaData->buff = frame;
255             mediaData->pts = frame->Pts();
256 
257             dispatcher->InputData(mediaData);
258             frameNums_++;
259         } else {
260             SplitH264((char *)frame->Data(), frame->Size(), 0, [&](const char *buf, size_t len, size_t prefix) {
261                 if ((*(buf + prefix) & 0x1f) == 0x06) {
262                     // discard the SEI data
263                     return;
264                 }
265 
266                 if ((*(buf + prefix) & 0x1f) == 0x07) {
267                     auto spsOld = dispatcher->GetSPS();
268                     if (spsOld != nullptr && spsOld->buff != nullptr) {
269                         return;
270                     }
271 
272                     // set sps into buffer dispathcer
273                     auto sps = std::make_shared<MediaData>();
274                     sps->buff = std::make_shared<DataBuffer>();
275                     sps->mediaType = MEDIA_TYPE_VIDEO;
276                     sps->buff->Assign((char *)buf, len);
277 
278                     dispatcher->SetSpsNalu(sps);
279                     return;
280                 }
281                 if ((*(buf + prefix) & 0x1f) == 0x08) {
282                     auto ppsOld = dispatcher->GetPPS();
283                     if (ppsOld != nullptr && ppsOld->buff != nullptr) {
284                         return;
285                     }
286 
287                     // set pps into buffer dispather
288                     auto pps = std::make_shared<MediaData>();
289                     pps->buff = std::make_shared<DataBuffer>();
290                     pps->mediaType = MEDIA_TYPE_VIDEO;
291                     pps->buff->Assign((char *)buf, len);
292 
293                     dispatcher->SetPpsNalu(pps);
294                     return;
295                 }
296                 auto mediaData = std::make_shared<MediaData>();
297                 mediaData->buff = std::make_shared<DataBuffer>();
298                 mediaData->mediaType = MEDIA_TYPE_VIDEO;
299                 mediaData->isRaw = false;
300                 // i-frame is key frame
301                 mediaData->keyFrame = (*(buf + prefix) & 0x1f) == 0x05 ? true : false;
302 
303                 if (mediaData->keyFrame) {
304                     if (isFirstKeyFrame_) {
305                         MEDIA_LOGD("TEST STATISTICS Miracast:first, agent ID:%{public}d, get video frame.",
306                                    GetSinkAgentId());
307                         isFirstKeyFrame_ = false;
308                     } else {
309                         auto end = std::chrono::steady_clock::now();
310                         std::chrono::duration<double, std::milli> diff = end - gopInterval_;
311                         MEDIA_LOGD("TEST STATISTIC Miracast:interval:%{public}.0f ms, "
312                             "agent ID:%{public}d, get video frame, gop:%{public}d, "
313                             "average receiving frames time:%{public}.0f ms.",
314                             diff.count(), GetSinkAgentId(), frameNums_, diff.count() / frameNums_);
315                     }
316 
317                     frameNums_ = 1;
318                     gopInterval_ = std::chrono::steady_clock::now();
319                 }
320 
321                 mediaData->buff->ReplaceData((char *)buf, len);
322                 mediaData->pts = frame->Pts();
323 
324                 dispatcher->InputData(mediaData);
325             });
326         }
327     }
328 }
329 
OnServerReadData(int32_t fd,DataBuffer::Ptr buf,INetworkSession::Ptr sesssion)330 void WfdRtpConsumer::OnServerReadData(int32_t fd, DataBuffer::Ptr buf, INetworkSession::Ptr sesssion)
331 {
332     if (rtpUnpacker_ != nullptr && isRunning_) {
333         rtpUnpacker_->ParseRtp(buf->Peek(), buf->Size());
334         if (isFirstPacket_) {
335             SHARING_LOGD("TEST STATISTICS Miracast:first, agent ID:%{public}d, recv first packet.", GetSinkAgentId());
336             isFirstPacket_ = false;
337         }
338     }
339 }
340 
StartNetworkServer(uint16_t port,NetworkFactory::ServerPtr & server,int32_t & fd)341 bool WfdRtpConsumer::StartNetworkServer(uint16_t port, NetworkFactory::ServerPtr &server, int32_t &fd)
342 {
343     SHARING_LOGD("trace.");
344     if (!NetworkFactory::CreateUdpServer(port, "0.0.0.0", shared_from_this(), server)) {
345         server.reset();
346         return false;
347     }
348 
349     fd = server->GetSocketInfo()->GetLocalFd();
350     return true;
351 }
352 
353 REGISTER_CLASS_REFLECTOR(WfdRtpConsumer);
354 } // namespace Sharing
355 } // namespace OHOS
356