1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wfd_rtp_consumer.h"
17 #include <chrono>
18 #include "extend/magic_enum/magic_enum.hpp"
19 #include "common/reflect_registration.h"
20 #include "configuration/include/config.h"
21 #include "event_comm.h"
22 #include "protocol/frame/h264_frame.h"
23 #include "wfd_media_def.h"
24 #include "wfd_session_def.h"
25
26 namespace OHOS {
27 namespace Sharing {
28
WfdRtpConsumer()29 WfdRtpConsumer::WfdRtpConsumer()
30 {
31 SHARING_LOGI("trace.");
32 }
33
~WfdRtpConsumer()34 WfdRtpConsumer::~WfdRtpConsumer()
35 {
36 SHARING_LOGI("wfd rtp consumer Id: %{public}u.", GetId());
37 Release();
38 }
39
HandleEvent(SharingEvent & event)40 int32_t WfdRtpConsumer::HandleEvent(SharingEvent &event)
41 {
42 SHARING_LOGD("trace.");
43 RETURN_INVALID_IF_NULL(event.eventMsg);
44
45 SHARING_LOGI("eventType: %{public}s, wfd rtp consumerId: %{public}u.",
46 std::string(magic_enum::enum_name(event.eventMsg->type)).c_str(), GetId());
47 switch (event.eventMsg->type) {
48 case EventType::EVENT_WFD_MEDIA_INIT:
49 HandleProsumerInitState(event);
50 break;
51 default:
52 SHARING_LOGI("none process case.");
53 break;
54 }
55
56 return 0;
57 }
58
HandleProsumerInitState(SharingEvent & event)59 void WfdRtpConsumer::HandleProsumerInitState(SharingEvent &event)
60 {
61 SHARING_LOGD("trace.");
62 RETURN_IF_NULL(event.eventMsg);
63
64 auto msg = ConvertEventMsg<WfdConsumerEventMsg>(event);
65 if (msg) {
66 port_ = msg->port;
67 if (msg->audioTrack.codecId != CodecId::CODEC_NONE) {
68 audioTrack_ = msg->audioTrack;
69 }
70 if (msg->videoTrack.codecId != CodecId::CODEC_NONE) {
71 videoTrack_ = msg->videoTrack;
72 }
73 }
74
75 isInit_ = true;
76
77 auto pPrivateMsg = std::make_shared<WfdSinkSessionEventMsg>();
78 pPrivateMsg->errorCode = ERR_OK;
79 pPrivateMsg->type = EVENT_AGENT_STATE_PROSUMER_INIT;
80 pPrivateMsg->toMgr = ModuleType::MODULE_CONTEXT;
81 pPrivateMsg->fromMgr = ModuleType::MODULE_MEDIACHANNEL;
82 pPrivateMsg->srcId = GetId();
83 pPrivateMsg->dstId = event.eventMsg->srcId;
84 contextId_ = event.eventMsg->srcId;
85 pPrivateMsg->agentId = GetSinkAgentId();
86 pPrivateMsg->prosumerId = GetId();
87
88 NotifyPrivateEvent(pPrivateMsg);
89 }
90
UpdateOperation(ProsumerStatusMsg::Ptr & statusMsg)91 void WfdRtpConsumer::UpdateOperation(ProsumerStatusMsg::Ptr &statusMsg)
92 {
93 SHARING_LOGD("trace.");
94 RETURN_IF_NULL(statusMsg);
95
96 SHARING_LOGD("status: %{public}s consumerId: %{public}u.",
97 std::string(magic_enum::enum_name(static_cast<ProsumerOptRunningStatus>(statusMsg->status))).c_str(),
98 GetId());
99 switch (statusMsg->status) {
100 case PROSUMER_INIT:
101 if (Init()) {
102 statusMsg->status = PROSUMER_NOTIFY_INIT_SUCCESS;
103 } else {
104 statusMsg->status = PROSUMER_NOTIFY_INIT_SUCCESS;
105 statusMsg->errorCode = ERR_PROSUMER_CREATE;
106 }
107 break;
108 case PROSUMER_START:
109 if (isInit_ && Start()) {
110 statusMsg->status = PROSUMER_NOTIFY_START_SUCCESS;
111 } else {
112 statusMsg->status = PROSUMER_NOTIFY_ERROR;
113 statusMsg->errorCode = ERR_PROSUMER_START;
114 }
115 break;
116 case PROSUMER_PAUSE:
117 isPaused_ = true;
118 mediaTypePaused_ = statusMsg->mediaType;
119 statusMsg->status = PROSUMER_NOTIFY_PAUSE_SUCCESS;
120 break;
121 case PROSUMER_RESUME:
122 isPaused_ = false;
123 statusMsg->status = PROSUMER_NOTIFY_RESUME_SUCCESS;
124 break;
125 case PROSUMER_STOP:
126 Stop();
127 statusMsg->status = PROSUMER_NOTIFY_STOP_SUCCESS;
128 break;
129 case PROSUMER_DESTROY:
130 Release();
131 statusMsg->status = PROSUMER_NOTIFY_DESTROY_SUCCESS;
132 break;
133 default:
134 SHARING_LOGI("none process case.");
135 break;
136 }
137
138 Notify(statusMsg);
139 }
140
Init()141 bool WfdRtpConsumer::Init()
142 {
143 SHARING_LOGD("trace.");
144 return InitRtpUnpacker();
145 }
146
Start()147 bool WfdRtpConsumer::Start()
148 {
149 SHARING_LOGD("trace.");
150 if (!StartNetworkServer(port_, rtpServer_.second, rtpServer_.first)) {
151 SHARING_LOGE("start rtp server port: %{public}d failed.", port_);
152 return false;
153 }
154
155 SHARING_LOGD("start reveiver server success.");
156 isRunning_ = true;
157 return true;
158 }
159
Stop()160 bool WfdRtpConsumer::Stop()
161 {
162 SHARING_LOGD("trace.");
163 isRunning_ = false;
164 if (rtpServer_.second) {
165 rtpServer_.second->Stop();
166 rtpServer_.second.reset();
167 }
168
169 if (rtpUnpacker_) {
170 rtpUnpacker_->Release();
171 rtpUnpacker_.reset();
172 }
173
174 return true;
175 }
176
Release()177 int32_t WfdRtpConsumer::Release()
178 {
179 SHARING_LOGD("WFDRtpConsumerId: %{public}u.", GetId());
180 auto end = std::chrono::steady_clock::now();
181 std::chrono::duration<double, std::milli> diff = end - gopInterval_;
182
183 SHARING_LOGD(
184 "TEST STATISTIC Miracast:finish, interval:%{public}.0f ms, agent ID:%{public}d, "
185 "get video frame, gop:%{public}d, average receiving frames time:%{public}.0f ms.",
186 diff.count(), GetSinkAgentId(), frameNums_, diff.count() / frameNums_);
187
188 Stop();
189 return 0;
190 }
191
InitRtpUnpacker()192 bool WfdRtpConsumer::InitRtpUnpacker()
193 {
194 SHARING_LOGD("trace.");
195 RtpPlaylodParam arpp = {33, 90000, RtpPayloadStream::MPEG2_TS}; // 33 : ts rtp payload, 90000 : sampe rate
196 rtpUnpacker_ = RtpFactory::CreateRtpUnpack(arpp);
197 if (rtpUnpacker_ != nullptr) {
198 // data callback
199 rtpUnpacker_->SetOnRtpUnpack(
200 std::bind(&WfdRtpConsumer::OnRtpUnpackCallback, this, std::placeholders::_1, std::placeholders::_2));
201 // notify callback
202 rtpUnpacker_->SetOnRtpNotify(std::bind(&WfdRtpConsumer::OnRtpUnpackNotify, this, std::placeholders::_1));
203 } else {
204 SHARING_LOGE("wfd init rtp unpacker failed.");
205 return false;
206 }
207
208 return true;
209 }
210
OnRtpUnpackNotify(int32_t errCode)211 void WfdRtpConsumer::OnRtpUnpackNotify(int32_t errCode)
212 {
213 SHARING_LOGD("errCode: %{public}d.", errCode);
214 }
215
OnRtpUnpackCallback(uint32_t ssrc,const Frame::Ptr & frame)216 void WfdRtpConsumer::OnRtpUnpackCallback(uint32_t ssrc, const Frame::Ptr &frame)
217 {
218 MEDIA_LOGD("trace.");
219 RETURN_IF_NULL(frame);
220 auto listener = listener_.lock();
221 RETURN_IF_NULL(listener);
222
223 auto dispatcher = listener->GetDispatcher();
224 RETURN_IF_NULL(dispatcher);
225
226 MediaData::Ptr mediaData;
227 if (frame->GetTrackType() == TRACK_AUDIO) {
228 if (isPaused_ && (mediaTypePaused_ == MEDIA_TYPE_AUDIO || mediaTypePaused_ == MEDIA_TYPE_AV)) {
229 return;
230 }
231
232 mediaData = std::make_shared<MediaData>();
233 mediaData->isRaw = false;
234 mediaData->keyFrame = false;
235 mediaData->mediaType = MEDIA_TYPE_AUDIO;
236 mediaData->buff = frame;
237 mediaData->pts = frame->Pts();
238 MEDIA_LOGD("audio & put it into dispatcher: %{public}u, consumerId: %{public}u.", dispatcher->GetDispatcherId(),
239 GetId());
240
241 dispatcher->InputData(mediaData);
242 } else if (frame->GetTrackType() == TRACK_VIDEO) {
243 if (isPaused_ && (mediaTypePaused_ == MEDIA_TYPE_VIDEO || mediaTypePaused_ == MEDIA_TYPE_AV)) {
244 return;
245 }
246
247 auto p = frame->Data();
248 p = *(p + 2) == 0x01 ? p + 3 : p + 4; // 2: fix offset, 3: fix offset, 4: fix offset
249 if (((p[0]) & 0x1f) == 0x01) {
250 mediaData = std::make_shared<MediaData>();
251 mediaData->mediaType = MEDIA_TYPE_VIDEO;
252 mediaData->isRaw = false;
253 mediaData->keyFrame = false;
254 mediaData->buff = frame;
255 mediaData->pts = frame->Pts();
256
257 dispatcher->InputData(mediaData);
258 frameNums_++;
259 } else {
260 SplitH264((char *)frame->Data(), frame->Size(), 0, [&](const char *buf, size_t len, size_t prefix) {
261 if ((*(buf + prefix) & 0x1f) == 0x06) {
262 // discard the SEI data
263 return;
264 }
265
266 if ((*(buf + prefix) & 0x1f) == 0x07) {
267 auto spsOld = dispatcher->GetSPS();
268 if (spsOld != nullptr && spsOld->buff != nullptr) {
269 return;
270 }
271
272 // set sps into buffer dispathcer
273 auto sps = std::make_shared<MediaData>();
274 sps->buff = std::make_shared<DataBuffer>();
275 sps->mediaType = MEDIA_TYPE_VIDEO;
276 sps->buff->Assign((char *)buf, len);
277
278 dispatcher->SetSpsNalu(sps);
279 return;
280 }
281 if ((*(buf + prefix) & 0x1f) == 0x08) {
282 auto ppsOld = dispatcher->GetPPS();
283 if (ppsOld != nullptr && ppsOld->buff != nullptr) {
284 return;
285 }
286
287 // set pps into buffer dispather
288 auto pps = std::make_shared<MediaData>();
289 pps->buff = std::make_shared<DataBuffer>();
290 pps->mediaType = MEDIA_TYPE_VIDEO;
291 pps->buff->Assign((char *)buf, len);
292
293 dispatcher->SetPpsNalu(pps);
294 return;
295 }
296 auto mediaData = std::make_shared<MediaData>();
297 mediaData->buff = std::make_shared<DataBuffer>();
298 mediaData->mediaType = MEDIA_TYPE_VIDEO;
299 mediaData->isRaw = false;
300 // i-frame is key frame
301 mediaData->keyFrame = (*(buf + prefix) & 0x1f) == 0x05 ? true : false;
302
303 if (mediaData->keyFrame) {
304 if (isFirstKeyFrame_) {
305 MEDIA_LOGD("TEST STATISTICS Miracast:first, agent ID:%{public}d, get video frame.",
306 GetSinkAgentId());
307 isFirstKeyFrame_ = false;
308 } else {
309 auto end = std::chrono::steady_clock::now();
310 std::chrono::duration<double, std::milli> diff = end - gopInterval_;
311 MEDIA_LOGD("TEST STATISTIC Miracast:interval:%{public}.0f ms, "
312 "agent ID:%{public}d, get video frame, gop:%{public}d, "
313 "average receiving frames time:%{public}.0f ms.",
314 diff.count(), GetSinkAgentId(), frameNums_, diff.count() / frameNums_);
315 }
316
317 frameNums_ = 1;
318 gopInterval_ = std::chrono::steady_clock::now();
319 }
320
321 mediaData->buff->ReplaceData((char *)buf, len);
322 mediaData->pts = frame->Pts();
323
324 dispatcher->InputData(mediaData);
325 });
326 }
327 }
328 }
329
OnServerReadData(int32_t fd,DataBuffer::Ptr buf,INetworkSession::Ptr sesssion)330 void WfdRtpConsumer::OnServerReadData(int32_t fd, DataBuffer::Ptr buf, INetworkSession::Ptr sesssion)
331 {
332 if (rtpUnpacker_ != nullptr && isRunning_) {
333 rtpUnpacker_->ParseRtp(buf->Peek(), buf->Size());
334 if (isFirstPacket_) {
335 SHARING_LOGD("TEST STATISTICS Miracast:first, agent ID:%{public}d, recv first packet.", GetSinkAgentId());
336 isFirstPacket_ = false;
337 }
338 }
339 }
340
StartNetworkServer(uint16_t port,NetworkFactory::ServerPtr & server,int32_t & fd)341 bool WfdRtpConsumer::StartNetworkServer(uint16_t port, NetworkFactory::ServerPtr &server, int32_t &fd)
342 {
343 SHARING_LOGD("trace.");
344 if (!NetworkFactory::CreateUdpServer(port, "0.0.0.0", shared_from_this(), server)) {
345 server.reset();
346 return false;
347 }
348
349 fd = server->GetSocketInfo()->GetLocalFd();
350 return true;
351 }
352
353 REGISTER_CLASS_REFLECTOR(WfdRtpConsumer);
354 } // namespace Sharing
355 } // namespace OHOS
356