1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wfd_rtp_producer.h"
17 #include <unistd.h>
18 #include "common/common_macro.h"
19 #include "common/reflect_registration.h"
20 #include "configuration/include/config.h"
21 #include "extend/magic_enum/magic_enum.hpp"
22 #include "protocol/frame/aac_frame.h"
23 #include "protocol/frame/h264_frame.h"
24 #include "utils/utils.h"
25 #include "wfd_media_def.h"
26 #include "wfd_session_def.h"
27
28 namespace OHOS {
29 namespace Sharing {
30
UdpClient(bool rtcp)31 WfdRtpProducer::UdpClient::UdpClient(bool rtcp) : rtcp_(rtcp) {}
32
~UdpClient()33 WfdRtpProducer::UdpClient::~UdpClient()
34 {
35 Release();
36 }
37
OnClientClose(int32_t fd)38 void WfdRtpProducer::UdpClient::OnClientClose(int32_t fd)
39 {
40 SHARING_LOGD("trace.");
41 (void)fd;
42 }
43
OnClientWriteable(int32_t fd)44 void WfdRtpProducer::UdpClient::OnClientWriteable(int32_t fd)
45 {
46 SHARING_LOGD("trace.");
47 (void)fd;
48 }
49
OnClientException(int32_t fd)50 void WfdRtpProducer::UdpClient::OnClientException(int32_t fd)
51 {
52 SHARING_LOGD("trace.");
53 (void)fd;
54 }
55
OnClientConnect(bool isSuccess)56 void WfdRtpProducer::UdpClient::OnClientConnect(bool isSuccess)
57 {
58 SHARING_LOGD("trace.");
59 (void)isSuccess;
60 }
61
OnClientReadData(int32_t fd,DataBuffer::Ptr buf)62 void WfdRtpProducer::UdpClient::OnClientReadData(int32_t fd, DataBuffer::Ptr buf)
63 {
64 SHARING_LOGD("trace.");
65 auto listener = udpDataListener_.lock();
66 if (listener && rtcp_) {
67 listener->OnRtcpReadData(fd, buf);
68 }
69 }
70
SetUdpDataListener(std::weak_ptr<WfdRtpProducer> udpDataListener)71 void WfdRtpProducer::UdpClient::SetUdpDataListener(std::weak_ptr<WfdRtpProducer> udpDataListener)
72 {
73 SHARING_LOGD("trace.");
74 udpDataListener_ = udpDataListener;
75 }
76
Stop()77 void WfdRtpProducer::UdpClient::Stop()
78 {
79 SHARING_LOGD("trace.");
80 if (networkClientPtr_ != nullptr) {
81 networkClientPtr_->Disconnect();
82 networkClientPtr_.reset();
83 }
84 }
85
Release()86 void WfdRtpProducer::UdpClient::Release()
87 {
88 SHARING_LOGD("trace.");
89 Stop();
90 }
91
Connect(const std::string & peerIp,uint16_t peerPort,const std::string & localIp,uint16_t localPort)92 bool WfdRtpProducer::UdpClient::Connect(const std::string &peerIp, uint16_t peerPort, const std::string &localIp,
93 uint16_t localPort)
94 {
95 SHARING_LOGD("trace.");
96 return NetworkFactory::CreateUdpClient(peerIp, peerPort, localIp, localPort, shared_from_this(), networkClientPtr_);
97 }
98
SendDataBuffer(const DataBuffer::Ptr & buf)99 bool WfdRtpProducer::UdpClient::SendDataBuffer(const DataBuffer::Ptr &buf)
100 {
101 MEDIA_LOGD("trace.");
102 if (networkClientPtr_ && buf) {
103 return networkClientPtr_->Send(buf, buf->Size());
104 }
105 return false;
106 }
107
WfdRtpProducer()108 WfdRtpProducer::WfdRtpProducer()
109 {
110 SHARING_LOGD("ctor.");
111 }
112
~WfdRtpProducer()113 WfdRtpProducer::~WfdRtpProducer()
114 {
115 SHARING_LOGD("dtor producer Id: %{public}u.", GetId());
116 }
117
UpdateOperation(ProsumerStatusMsg::Ptr & statusMsg)118 void WfdRtpProducer::UpdateOperation(ProsumerStatusMsg::Ptr &statusMsg)
119 {
120 SHARING_LOGD("trace.");
121 RETURN_IF_NULL(statusMsg);
122 SHARING_LOGD("status: %{public}s producerId: %{public}u.",
123 std::string(magic_enum::enum_name(static_cast<ProsumerOptRunningStatus>(statusMsg->status))).c_str(),
124 GetId());
125 switch (statusMsg->status) {
126 case ProsumerOptRunningStatus::PROSUMER_INIT:
127 statusMsg->status = PROSUMER_NOTIFY_INIT_SUCCESS;
128 break;
129 case PROSUMER_START: {
130 isPaused_ = false;
131 if (isInit_ && (Connect() == 0)) {
132 statusMsg->status = PROSUMER_NOTIFY_START_SUCCESS;
133 } else {
134 statusMsg->status = PROSUMER_NOTIFY_ERROR;
135 statusMsg->errorCode = ERR_PROSUMER_START;
136 }
137 break;
138 }
139 case PROSUMER_PAUSE: {
140 isPaused_ = true;
141 statusMsg->status = PROSUMER_NOTIFY_PAUSE_SUCCESS;
142 break;
143 }
144
145 case PROSUMER_RESUME: {
146 isPaused_ = false;
147 statusMsg->status = PROSUMER_NOTIFY_RESUME_SUCCESS;
148 break;
149 }
150
151 case PROSUMER_STOP: {
152 if (Stop() == 0) {
153 statusMsg->status = PROSUMER_NOTIFY_STOP_SUCCESS;
154 } else {
155 statusMsg->status = PROSUMER_NOTIFY_ERROR;
156 statusMsg->errorCode = ERR_PROSUMER_STOP;
157 }
158 break;
159 }
160
161 case PROSUMER_DESTROY: {
162 Release();
163 statusMsg->status = PROSUMER_NOTIFY_DESTROY_SUCCESS;
164 break;
165 }
166
167 default:
168 break;
169 }
170 Notify(statusMsg);
171 }
172
PublishOneFrame(const MediaData::Ptr & mediaData)173 void WfdRtpProducer::PublishOneFrame(const MediaData::Ptr &mediaData)
174 {
175 MEDIA_LOGD("trace.");
176 RETURN_IF_NULL(mediaData);
177 RETURN_IF_NULL(mediaData->buff);
178 RETURN_IF_NULL(mediaData->buff->Peek());
179 auto buff = mediaData->buff;
180 if (isRunning_ && !isPaused_) { // paused check at this pos or in DispatchMediaData
181 if (mediaData->mediaType == MEDIA_TYPE_AUDIO) {
182 MEDIA_LOGD("audio frame pts:%{public}" PRId64 ".", mediaData->pts);
183 auto aacFrame = FrameImpl::CreateFrom(std::move(*buff));
184 aacFrame->codecId_ = CodecId::CODEC_AAC;
185 aacFrame->dts_ = aacFrame->pts_ = static_cast<uint32_t>(mediaData->pts);
186 tsPacker_->InputFrame(aacFrame);
187 } else if (mediaData->mediaType == MEDIA_TYPE_VIDEO) {
188 MEDIA_LOGD("video frame pts:%{public}" PRId64 ".", mediaData->pts);
189 auto h264Frame = std::make_shared<H264Frame>(std::move(*buff));
190 h264Frame->dts_ = h264Frame->pts_ = static_cast<uint32_t>(mediaData->pts);
191 h264Frame->prefixSize_ = PrefixSize(h264Frame->Peek(), h264Frame->Size());
192 h264Frame->codecId_ = CODEC_H264;
193 tsPacker_->InputFrame(h264Frame);
194 }
195 }
196 }
197
HandleEvent(SharingEvent & event)198 int32_t WfdRtpProducer::HandleEvent(SharingEvent &event)
199 {
200 RETURN_INVALID_IF_NULL(event.eventMsg);
201 SHARING_LOGI("HandleEvent eventType: %{public}d.", event.eventMsg->type);
202 switch (event.eventMsg->type) {
203 case EventType::EVENT_WFD_MEDIA_INIT: {
204 HandleMediaInit(ConvertEventMsg<WfdProducerEventMsg>(event));
205 break;
206 }
207 default:
208 break;
209 }
210 return 0;
211 }
212
ProducerInit()213 bool WfdRtpProducer::ProducerInit()
214 {
215 SHARING_LOGD("trace.");
216 if (InitTsRtpPacker(ssrc_) != 0) {
217 SHARING_LOGE("init rtp packer failed.");
218 return false;
219 }
220
221 if (InitUdpClients() != 0) {
222 SHARING_LOGE("init udp clients failed.");
223 return false;
224 }
225
226 isInit_ = true;
227 return true;
228 }
229
InitTsRtpPacker(uint32_t ssrc,size_t mtuSize,uint32_t sampleRate,uint8_t pt,RtpPayloadStream ps)230 int32_t WfdRtpProducer::InitTsRtpPacker(uint32_t ssrc, size_t mtuSize, uint32_t sampleRate, uint8_t pt,
231 RtpPayloadStream ps)
232 {
233 SHARING_LOGD("trace.");
234 tsPacker_ = RtpFactory::CreateRtpPack(ssrc, mtuSize, sampleRate, pt, ps, 0);
235 if (tsPacker_ == nullptr) {
236 SHARING_LOGE("createRtpPacker failed.");
237 return -1;
238 }
239 tsPacker_->SetOnRtpPack([=](const RtpPacket::Ptr &rtp) {
240 MEDIA_LOGD("rtp packed seq: %{public}d timestamp: %{public}d size: %{public}d.", rtp->GetSeq(), rtp->GetStamp(),
241 rtp->Size());
242 SendDataBuffer(rtp);
243 });
244 return 0;
245 }
246
InitUdpClients()247 int32_t WfdRtpProducer::InitUdpClients()
248 {
249 SHARING_LOGD("trace.");
250 tsUdpClient_ = std::make_shared<UdpClient>(false);
251 tsRtcpUdpClient_ = std::make_shared<UdpClient>(true);
252 tsRtcpUdpClient_->SetUdpDataListener(shared_from_this());
253
254 SharingValue::Ptr values = nullptr;
255 auto ret = Config::GetInstance().GetConfig("mediachannel", "rtcpLimit", "timeout", values);
256 if (ret == CONFIGURE_ERROR_NONE) {
257 values->GetValue<int32_t>(rtcpCheckInterval_);
258 }
259
260 if (rtcpCheckInterval_ > 0) {
261 rtcpSendContext_ = std::make_shared<RtcpSenderContext>();
262 }
263
264 return 0;
265 }
266
SendDataBuffer(const DataBuffer::Ptr & buf,bool audio)267 bool WfdRtpProducer::SendDataBuffer(const DataBuffer::Ptr &buf, bool audio)
268 {
269 MEDIA_LOGD("trace.");
270 RETURN_FALSE_IF_NULL(buf);
271 (void)audio;
272 return tsUdpClient_->SendDataBuffer(buf);
273 }
274
Connect()275 int32_t WfdRtpProducer::Connect()
276 {
277 SHARING_LOGD("trace.");
278 if (tsPacker_ == nullptr) {
279 SHARING_LOGE("connect resource not useable.");
280 return -1;
281 }
282
283 if (tsUdpClient_ == nullptr || tsRtcpUdpClient_ == nullptr) {
284 SHARING_LOGE("udp network need init.");
285 return -1;
286 }
287 // create two udp client
288 if (!tsUdpClient_->Connect(primarySinkIp_, primarySinkPort_, localIp_, localPort_) ||
289 !tsRtcpUdpClient_->Connect(primarySinkIp_, primarySinkPort_ + 1, localIp_, localPort_ + 1)) {
290 SHARING_LOGE("createNetworkClient failed.");
291 return -1;
292 }
293
294 SHARING_LOGD("createNetworkClient success.");
295
296 isRunning_ = true;
297 return 0;
298 }
299
Stop()300 int32_t WfdRtpProducer::Stop()
301 {
302 SHARING_LOGD("producerId: %{public}u.", GetId());
303 isRunning_ = false;
304 if (tsUdpClient_ != nullptr) {
305 tsUdpClient_->Stop();
306 }
307
308 if (tsRtcpUdpClient_ != nullptr) {
309 tsRtcpUdpClient_->Stop();
310 }
311
312 dispatching_ = false;
313 NotifyReadStop();
314 if (dispatchThread_ && dispatchThread_->joinable()) {
315 dispatchThread_->join();
316 dispatchThread_ = nullptr;
317 }
318 return 0;
319 }
320
Release()321 int32_t WfdRtpProducer::Release()
322 {
323 SHARING_LOGD("producerId: %{public}u.", GetId());
324 if (tsUdpClient_ != nullptr) {
325 tsUdpClient_.reset();
326 }
327
328 if (tsRtcpUdpClient_ != nullptr) {
329 tsRtcpUdpClient_.reset();
330 }
331
332 if (tsPacker_ != nullptr) {
333 tsPacker_.reset();
334 }
335
336 if (rtcpSendContext_ != nullptr) {
337 rtcpSendContext_.reset();
338 }
339 return 0;
340 }
341
OnRtcpReadData(int32_t fd,DataBuffer::Ptr buf)342 void WfdRtpProducer::OnRtcpReadData(int32_t fd, DataBuffer::Ptr buf)
343 {
344 MEDIA_LOGD("trace.");
345 (void)fd;
346 if (buf && (buf->Size() > 0)) {
347 MEDIA_LOGD("recv rtcp rsp, producerId: %{public}u.", GetId());
348 rtcpOvertimes_ = 0;
349 }
350 }
351
OnRtcpTimeOut()352 void WfdRtpProducer::OnRtcpTimeOut()
353 {
354 MEDIA_LOGD("producerId: %{public}u tid: %{public}d.", GetId(), gettid());
355 auto listener = listener_.lock();
356 if (listener && (rtcpOvertimes_ >= 3)) { // 3 : DEFAULT_RTCP_OVERTIMES
357 SHARING_LOGW("rtcp time out,producerId: %{public}u,tid: %{public}d.", GetId(), gettid());
358 ProsumerStatusMsg::Ptr pMsg = std::make_shared<ProsumerStatusMsg>();
359 pMsg->prosumerId = GetId();
360 pMsg->agentId = GetSrcAgentId();
361 pMsg->eventMsg = std::make_shared<AgentEventMsg>();
362 pMsg->status = ProsumerNotifyStatus::PROSUMER_NOTIFY_ERROR;
363 pMsg->eventMsg->type = EVENT_AGENT_PROSUMER_ERROR;
364 pMsg->errorCode = ERR_PROSUMER_TIMEOUT;
365 listener->OnProducerNotify(pMsg);
366 rtcpOvertimes_ = 0;
367 return;
368 } else {
369 rtcpOvertimes_++;
370 }
371 if (rtcpSendContext_ && tsRtcpUdpClient_) {
372 tsRtcpUdpClient_->SendDataBuffer(rtcpSendContext_->CreateRtcpSR(ssrc_));
373 }
374 }
375
StartDispatchThread()376 void WfdRtpProducer::StartDispatchThread()
377 {
378 SHARING_LOGD("trace.");
379 dispatching_ = true;
380 dispatchThread_ = std::make_shared<std::thread>(&WfdRtpProducer::DispatchMediaData, this);
381 }
382
DispatchMediaData()383 void WfdRtpProducer::DispatchMediaData()
384 {
385 SHARING_LOGD("trace.");
386 uint32_t rtpCount = 0;
387 MediaData::Ptr mediaData = std::make_shared<MediaData>();
388 mediaData->buff = std::make_shared<DataBuffer>();
389 while (dispatching_) {
390 int32_t ret = RequestRead(MEDIA_TYPE_AV, [&mediaData](const MediaData::Ptr &data) {
391 mediaData->buff->ReplaceData(data->buff->Peek(), data->buff->Size());
392 mediaData->keyFrame = data->keyFrame;
393 mediaData->mediaType = data->mediaType;
394 mediaData->pts = data->pts;
395 mediaData->isRaw = data->isRaw;
396 mediaData->ssrc = data->ssrc;
397 });
398 if (ret != 0) {
399 SHARING_LOGE("Request media data err :%{public}d.", ret);
400 continue;
401 }
402 MEDIA_LOGD("receiver: %{public}u producerId: %{public}u received a av: %{public}d data :%{public}d \
403 from dispatcher: %{public}u.",
404 GetReceiverId(), GetId(), (int)mediaData->mediaType, mediaData->buff->Size(), GetDispatcherId());
405 if (mediaData->keyFrame) {
406 auto sps = GetSPS();
407 if (sps != nullptr && sps->buff != nullptr) {
408 auto spsFrame = std::make_shared<MediaData>(*sps);
409 spsFrame->buff = std::make_shared<DataBuffer>(*(sps->buff));
410 spsFrame->pts = mediaData->pts;
411 PublishOneFrame(spsFrame);
412 }
413
414 auto pps = GetPPS();
415 if (pps != nullptr && pps->buff != nullptr) {
416 auto ppsFrame = std::make_shared<MediaData>(*pps);
417 ppsFrame->buff = std::make_shared<DataBuffer>(*(pps->buff));
418 ppsFrame->pts = mediaData->pts;
419 PublishOneFrame(ppsFrame);
420 }
421 }
422
423 PublishOneFrame(mediaData);
424
425 if (rtcpCheckInterval_ > 0) {
426 ++rtpCount;
427 if ((rtpCount & 0x3ff) == 1023) { // 1023 : rtsp timeout
428 OnRtcpTimeOut();
429 }
430 }
431 }
432 }
433
HandleMediaInit(std::shared_ptr<WfdProducerEventMsg> msg)434 void WfdRtpProducer::HandleMediaInit(std::shared_ptr<WfdProducerEventMsg> msg)
435 {
436 if (!msg) {
437 SHARING_LOGE("msg is nullptr.");
438 return;
439 }
440 primarySinkPort_ = msg->port;
441 primarySinkIp_ = msg->ip;
442 localPort_ = msg->localPort;
443 localIp_ = msg->localIp;
444 SHARING_LOGD("primarySinkIp:%s port:%d localIp:%s localPort:%d.", GetAnonyString(primarySinkIp_).c_str(),
445 primarySinkPort_, GetAnonyString(localIp_).c_str(), localPort_);
446 SharingErrorCode errCode = ERR_OK;
447 if (!ProducerInit()) {
448 errCode = ERR_PROSUMER_INIT;
449 }
450 auto pPrivateMsg = std::make_shared<WfdSourceSessionEventMsg>();
451 pPrivateMsg->errorCode = errCode;
452 pPrivateMsg->type = EVENT_WFD_STATE_MEDIA_INIT;
453 pPrivateMsg->toMgr = ModuleType::MODULE_CONTEXT;
454 pPrivateMsg->fromMgr = ModuleType::MODULE_MEDIACHANNEL;
455 pPrivateMsg->srcId = GetId();
456 pPrivateMsg->dstId = msg->srcId;
457 pPrivateMsg->agentId = GetSrcAgentId();
458 pPrivateMsg->prosumerId = GetId();
459 NotifyPrivateEvent(pPrivateMsg);
460 }
461
462 REGISTER_CLASS_REFLECTOR(WfdRtpProducer);
463 } // namespace Sharing
464 } // namespace OHOS
465