• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "wfd_rtp_producer.h"
17 #include <unistd.h>
18 #include "common/common_macro.h"
19 #include "common/reflect_registration.h"
20 #include "configuration/include/config.h"
21 #include "extend/magic_enum/magic_enum.hpp"
22 #include "protocol/frame/aac_frame.h"
23 #include "protocol/frame/h264_frame.h"
24 #include "utils/utils.h"
25 #include "wfd_media_def.h"
26 #include "wfd_session_def.h"
27 
28 namespace OHOS {
29 namespace Sharing {
30 
UdpClient(bool rtcp)31 WfdRtpProducer::UdpClient::UdpClient(bool rtcp) : rtcp_(rtcp) {}
32 
~UdpClient()33 WfdRtpProducer::UdpClient::~UdpClient()
34 {
35     Release();
36 }
37 
OnClientClose(int32_t fd)38 void WfdRtpProducer::UdpClient::OnClientClose(int32_t fd)
39 {
40     SHARING_LOGD("trace.");
41     (void)fd;
42 }
43 
OnClientWriteable(int32_t fd)44 void WfdRtpProducer::UdpClient::OnClientWriteable(int32_t fd)
45 {
46     SHARING_LOGD("trace.");
47     (void)fd;
48 }
49 
OnClientException(int32_t fd)50 void WfdRtpProducer::UdpClient::OnClientException(int32_t fd)
51 {
52     SHARING_LOGD("trace.");
53     (void)fd;
54 }
55 
OnClientConnect(bool isSuccess)56 void WfdRtpProducer::UdpClient::OnClientConnect(bool isSuccess)
57 {
58     SHARING_LOGD("trace.");
59     (void)isSuccess;
60 }
61 
OnClientReadData(int32_t fd,DataBuffer::Ptr buf)62 void WfdRtpProducer::UdpClient::OnClientReadData(int32_t fd, DataBuffer::Ptr buf)
63 {
64     SHARING_LOGD("trace.");
65     auto listener = udpDataListener_.lock();
66     if (listener && rtcp_) {
67         listener->OnRtcpReadData(fd, buf);
68     }
69 }
70 
SetUdpDataListener(std::weak_ptr<WfdRtpProducer> udpDataListener)71 void WfdRtpProducer::UdpClient::SetUdpDataListener(std::weak_ptr<WfdRtpProducer> udpDataListener)
72 {
73     SHARING_LOGD("trace.");
74     udpDataListener_ = udpDataListener;
75 }
76 
Stop()77 void WfdRtpProducer::UdpClient::Stop()
78 {
79     SHARING_LOGD("trace.");
80     if (networkClientPtr_ != nullptr) {
81         networkClientPtr_->Disconnect();
82         networkClientPtr_.reset();
83     }
84 }
85 
Release()86 void WfdRtpProducer::UdpClient::Release()
87 {
88     SHARING_LOGD("trace.");
89     Stop();
90 }
91 
Connect(const std::string & peerIp,uint16_t peerPort,const std::string & localIp,uint16_t localPort)92 bool WfdRtpProducer::UdpClient::Connect(const std::string &peerIp, uint16_t peerPort, const std::string &localIp,
93                                         uint16_t localPort)
94 {
95     SHARING_LOGD("trace.");
96     return NetworkFactory::CreateUdpClient(peerIp, peerPort, localIp, localPort, shared_from_this(), networkClientPtr_);
97 }
98 
SendDataBuffer(const DataBuffer::Ptr & buf)99 bool WfdRtpProducer::UdpClient::SendDataBuffer(const DataBuffer::Ptr &buf)
100 {
101     MEDIA_LOGD("trace.");
102     if (networkClientPtr_ && buf) {
103         return networkClientPtr_->Send(buf, buf->Size());
104     }
105     return false;
106 }
107 
WfdRtpProducer()108 WfdRtpProducer::WfdRtpProducer()
109 {
110     SHARING_LOGD("ctor.");
111 }
112 
~WfdRtpProducer()113 WfdRtpProducer::~WfdRtpProducer()
114 {
115     SHARING_LOGD("dtor producer Id: %{public}u.", GetId());
116 }
117 
UpdateOperation(ProsumerStatusMsg::Ptr & statusMsg)118 void WfdRtpProducer::UpdateOperation(ProsumerStatusMsg::Ptr &statusMsg)
119 {
120     SHARING_LOGD("trace.");
121     RETURN_IF_NULL(statusMsg);
122     SHARING_LOGD("status: %{public}s producerId: %{public}u.",
123                  std::string(magic_enum::enum_name(static_cast<ProsumerOptRunningStatus>(statusMsg->status))).c_str(),
124                  GetId());
125     switch (statusMsg->status) {
126         case ProsumerOptRunningStatus::PROSUMER_INIT:
127             statusMsg->status = PROSUMER_NOTIFY_INIT_SUCCESS;
128             break;
129         case PROSUMER_START: {
130             isPaused_ = false;
131             if (isInit_ && (Connect() == 0)) {
132                 statusMsg->status = PROSUMER_NOTIFY_START_SUCCESS;
133             } else {
134                 statusMsg->status = PROSUMER_NOTIFY_ERROR;
135                 statusMsg->errorCode = ERR_PROSUMER_START;
136             }
137             break;
138         }
139         case PROSUMER_PAUSE: {
140             isPaused_ = true;
141             statusMsg->status = PROSUMER_NOTIFY_PAUSE_SUCCESS;
142             break;
143         }
144 
145         case PROSUMER_RESUME: {
146             isPaused_ = false;
147             statusMsg->status = PROSUMER_NOTIFY_RESUME_SUCCESS;
148             break;
149         }
150 
151         case PROSUMER_STOP: {
152             if (Stop() == 0) {
153                 statusMsg->status = PROSUMER_NOTIFY_STOP_SUCCESS;
154             } else {
155                 statusMsg->status = PROSUMER_NOTIFY_ERROR;
156                 statusMsg->errorCode = ERR_PROSUMER_STOP;
157             }
158             break;
159         }
160 
161         case PROSUMER_DESTROY: {
162             Release();
163             statusMsg->status = PROSUMER_NOTIFY_DESTROY_SUCCESS;
164             break;
165         }
166 
167         default:
168             break;
169     }
170     Notify(statusMsg);
171 }
172 
PublishOneFrame(const MediaData::Ptr & mediaData)173 void WfdRtpProducer::PublishOneFrame(const MediaData::Ptr &mediaData)
174 {
175     MEDIA_LOGD("trace.");
176     RETURN_IF_NULL(mediaData);
177     RETURN_IF_NULL(mediaData->buff);
178     RETURN_IF_NULL(mediaData->buff->Peek());
179     auto buff = mediaData->buff;
180     if (isRunning_ && !isPaused_) { // paused check at this pos or in DispatchMediaData
181         if (mediaData->mediaType == MEDIA_TYPE_AUDIO) {
182             MEDIA_LOGD("audio frame pts:%{public}" PRId64 ".", mediaData->pts);
183             auto aacFrame = FrameImpl::CreateFrom(std::move(*buff));
184             aacFrame->codecId_ = CodecId::CODEC_AAC;
185             aacFrame->dts_ = aacFrame->pts_ = static_cast<uint32_t>(mediaData->pts);
186             tsPacker_->InputFrame(aacFrame);
187         } else if (mediaData->mediaType == MEDIA_TYPE_VIDEO) {
188             MEDIA_LOGD("video frame pts:%{public}" PRId64 ".", mediaData->pts);
189             auto h264Frame = std::make_shared<H264Frame>(std::move(*buff));
190             h264Frame->dts_ = h264Frame->pts_ = static_cast<uint32_t>(mediaData->pts);
191             h264Frame->prefixSize_ = PrefixSize(h264Frame->Peek(), h264Frame->Size());
192             h264Frame->codecId_ = CODEC_H264;
193             tsPacker_->InputFrame(h264Frame);
194         }
195     }
196 }
197 
HandleEvent(SharingEvent & event)198 int32_t WfdRtpProducer::HandleEvent(SharingEvent &event)
199 {
200     RETURN_INVALID_IF_NULL(event.eventMsg);
201     SHARING_LOGI("HandleEvent eventType: %{public}d.", event.eventMsg->type);
202     switch (event.eventMsg->type) {
203         case EventType::EVENT_WFD_MEDIA_INIT: {
204             HandleMediaInit(ConvertEventMsg<WfdProducerEventMsg>(event));
205             break;
206         }
207         default:
208             break;
209     }
210     return 0;
211 }
212 
ProducerInit()213 bool WfdRtpProducer::ProducerInit()
214 {
215     SHARING_LOGD("trace.");
216     if (InitTsRtpPacker(ssrc_) != 0) {
217         SHARING_LOGE("init rtp packer failed.");
218         return false;
219     }
220 
221     if (InitUdpClients() != 0) {
222         SHARING_LOGE("init udp clients failed.");
223         return false;
224     }
225 
226     isInit_ = true;
227     return true;
228 }
229 
InitTsRtpPacker(uint32_t ssrc,size_t mtuSize,uint32_t sampleRate,uint8_t pt,RtpPayloadStream ps)230 int32_t WfdRtpProducer::InitTsRtpPacker(uint32_t ssrc, size_t mtuSize, uint32_t sampleRate, uint8_t pt,
231                                         RtpPayloadStream ps)
232 {
233     SHARING_LOGD("trace.");
234     tsPacker_ = RtpFactory::CreateRtpPack(ssrc, mtuSize, sampleRate, pt, ps, 0);
235     if (tsPacker_ == nullptr) {
236         SHARING_LOGE("createRtpPacker failed.");
237         return -1;
238     }
239     tsPacker_->SetOnRtpPack([=](const RtpPacket::Ptr &rtp) {
240         MEDIA_LOGD("rtp packed seq: %{public}d timestamp: %{public}d size: %{public}d.", rtp->GetSeq(), rtp->GetStamp(),
241                    rtp->Size());
242         SendDataBuffer(rtp);
243     });
244     return 0;
245 }
246 
InitUdpClients()247 int32_t WfdRtpProducer::InitUdpClients()
248 {
249     SHARING_LOGD("trace.");
250     tsUdpClient_ = std::make_shared<UdpClient>(false);
251     tsRtcpUdpClient_ = std::make_shared<UdpClient>(true);
252     tsRtcpUdpClient_->SetUdpDataListener(shared_from_this());
253 
254     SharingValue::Ptr values = nullptr;
255     auto ret = Config::GetInstance().GetConfig("mediachannel", "rtcpLimit", "timeout", values);
256     if (ret == CONFIGURE_ERROR_NONE) {
257         values->GetValue<int32_t>(rtcpCheckInterval_);
258     }
259 
260     if (rtcpCheckInterval_ > 0) {
261         rtcpSendContext_ = std::make_shared<RtcpSenderContext>();
262     }
263 
264     return 0;
265 }
266 
SendDataBuffer(const DataBuffer::Ptr & buf,bool audio)267 bool WfdRtpProducer::SendDataBuffer(const DataBuffer::Ptr &buf, bool audio)
268 {
269     MEDIA_LOGD("trace.");
270     RETURN_FALSE_IF_NULL(buf);
271     (void)audio;
272     return tsUdpClient_->SendDataBuffer(buf);
273 }
274 
Connect()275 int32_t WfdRtpProducer::Connect()
276 {
277     SHARING_LOGD("trace.");
278     if (tsPacker_ == nullptr) {
279         SHARING_LOGE("connect resource not useable.");
280         return -1;
281     }
282 
283     if (tsUdpClient_ == nullptr || tsRtcpUdpClient_ == nullptr) {
284         SHARING_LOGE("udp network need init.");
285         return -1;
286     }
287     // create two udp client
288     if (!tsUdpClient_->Connect(primarySinkIp_, primarySinkPort_, localIp_, localPort_) ||
289         !tsRtcpUdpClient_->Connect(primarySinkIp_, primarySinkPort_ + 1, localIp_, localPort_ + 1)) {
290         SHARING_LOGE("createNetworkClient failed.");
291         return -1;
292     }
293 
294     SHARING_LOGD("createNetworkClient success.");
295 
296     isRunning_ = true;
297     return 0;
298 }
299 
Stop()300 int32_t WfdRtpProducer::Stop()
301 {
302     SHARING_LOGD("producerId: %{public}u.", GetId());
303     isRunning_ = false;
304     if (tsUdpClient_ != nullptr) {
305         tsUdpClient_->Stop();
306     }
307 
308     if (tsRtcpUdpClient_ != nullptr) {
309         tsRtcpUdpClient_->Stop();
310     }
311 
312     dispatching_ = false;
313     NotifyReadStop();
314     if (dispatchThread_ && dispatchThread_->joinable()) {
315         dispatchThread_->join();
316         dispatchThread_ = nullptr;
317     }
318     return 0;
319 }
320 
Release()321 int32_t WfdRtpProducer::Release()
322 {
323     SHARING_LOGD("producerId: %{public}u.", GetId());
324     if (tsUdpClient_ != nullptr) {
325         tsUdpClient_.reset();
326     }
327 
328     if (tsRtcpUdpClient_ != nullptr) {
329         tsRtcpUdpClient_.reset();
330     }
331 
332     if (tsPacker_ != nullptr) {
333         tsPacker_.reset();
334     }
335 
336     if (rtcpSendContext_ != nullptr) {
337         rtcpSendContext_.reset();
338     }
339     return 0;
340 }
341 
OnRtcpReadData(int32_t fd,DataBuffer::Ptr buf)342 void WfdRtpProducer::OnRtcpReadData(int32_t fd, DataBuffer::Ptr buf)
343 {
344     MEDIA_LOGD("trace.");
345     (void)fd;
346     if (buf && (buf->Size() > 0)) {
347         MEDIA_LOGD("recv rtcp rsp, producerId: %{public}u.", GetId());
348         rtcpOvertimes_ = 0;
349     }
350 }
351 
OnRtcpTimeOut()352 void WfdRtpProducer::OnRtcpTimeOut()
353 {
354     MEDIA_LOGD("producerId: %{public}u tid: %{public}d.", GetId(), gettid());
355     auto listener = listener_.lock();
356     if (listener && (rtcpOvertimes_ >= 3)) { // 3 : DEFAULT_RTCP_OVERTIMES
357         SHARING_LOGW("rtcp time out,producerId: %{public}u,tid: %{public}d.", GetId(), gettid());
358         ProsumerStatusMsg::Ptr pMsg = std::make_shared<ProsumerStatusMsg>();
359         pMsg->prosumerId = GetId();
360         pMsg->agentId = GetSrcAgentId();
361         pMsg->eventMsg = std::make_shared<AgentEventMsg>();
362         pMsg->status = ProsumerNotifyStatus::PROSUMER_NOTIFY_ERROR;
363         pMsg->eventMsg->type = EVENT_AGENT_PROSUMER_ERROR;
364         pMsg->errorCode = ERR_PROSUMER_TIMEOUT;
365         listener->OnProducerNotify(pMsg);
366         rtcpOvertimes_ = 0;
367         return;
368     } else {
369         rtcpOvertimes_++;
370     }
371     if (rtcpSendContext_ && tsRtcpUdpClient_) {
372         tsRtcpUdpClient_->SendDataBuffer(rtcpSendContext_->CreateRtcpSR(ssrc_));
373     }
374 }
375 
StartDispatchThread()376 void WfdRtpProducer::StartDispatchThread()
377 {
378     SHARING_LOGD("trace.");
379     dispatching_ = true;
380     dispatchThread_ = std::make_shared<std::thread>(&WfdRtpProducer::DispatchMediaData, this);
381 }
382 
DispatchMediaData()383 void WfdRtpProducer::DispatchMediaData()
384 {
385     SHARING_LOGD("trace.");
386     uint32_t rtpCount = 0;
387     MediaData::Ptr mediaData = std::make_shared<MediaData>();
388     mediaData->buff = std::make_shared<DataBuffer>();
389     while (dispatching_) {
390         int32_t ret = RequestRead(MEDIA_TYPE_AV, [&mediaData](const MediaData::Ptr &data) {
391             mediaData->buff->ReplaceData(data->buff->Peek(), data->buff->Size());
392             mediaData->keyFrame = data->keyFrame;
393             mediaData->mediaType = data->mediaType;
394             mediaData->pts = data->pts;
395             mediaData->isRaw = data->isRaw;
396             mediaData->ssrc = data->ssrc;
397         });
398         if (ret != 0) {
399             SHARING_LOGE("Request media data err :%{public}d.", ret);
400             continue;
401         }
402         MEDIA_LOGD("receiver: %{public}u producerId: %{public}u received a av: %{public}d data :%{public}d \
403                    from dispatcher: %{public}u.",
404                    GetReceiverId(), GetId(), (int)mediaData->mediaType, mediaData->buff->Size(), GetDispatcherId());
405         if (mediaData->keyFrame) {
406             auto sps = GetSPS();
407             if (sps != nullptr && sps->buff != nullptr) {
408                 auto spsFrame = std::make_shared<MediaData>(*sps);
409                 spsFrame->buff = std::make_shared<DataBuffer>(*(sps->buff));
410                 spsFrame->pts = mediaData->pts;
411                 PublishOneFrame(spsFrame);
412             }
413 
414             auto pps = GetPPS();
415             if (pps != nullptr && pps->buff != nullptr) {
416                 auto ppsFrame = std::make_shared<MediaData>(*pps);
417                 ppsFrame->buff = std::make_shared<DataBuffer>(*(pps->buff));
418                 ppsFrame->pts = mediaData->pts;
419                 PublishOneFrame(ppsFrame);
420             }
421         }
422 
423         PublishOneFrame(mediaData);
424 
425         if (rtcpCheckInterval_ > 0) {
426             ++rtpCount;
427             if ((rtpCount & 0x3ff) == 1023) { // 1023 : rtsp timeout
428                 OnRtcpTimeOut();
429             }
430         }
431     }
432 }
433 
HandleMediaInit(std::shared_ptr<WfdProducerEventMsg> msg)434 void WfdRtpProducer::HandleMediaInit(std::shared_ptr<WfdProducerEventMsg> msg)
435 {
436     if (!msg) {
437         SHARING_LOGE("msg is nullptr.");
438         return;
439     }
440     primarySinkPort_ = msg->port;
441     primarySinkIp_ = msg->ip;
442     localPort_ = msg->localPort;
443     localIp_ = msg->localIp;
444     SHARING_LOGD("primarySinkIp:%s port:%d localIp:%s localPort:%d.", GetAnonyString(primarySinkIp_).c_str(),
445                  primarySinkPort_, GetAnonyString(localIp_).c_str(), localPort_);
446     SharingErrorCode errCode = ERR_OK;
447     if (!ProducerInit()) {
448         errCode = ERR_PROSUMER_INIT;
449     }
450     auto pPrivateMsg = std::make_shared<WfdSourceSessionEventMsg>();
451     pPrivateMsg->errorCode = errCode;
452     pPrivateMsg->type = EVENT_WFD_STATE_MEDIA_INIT;
453     pPrivateMsg->toMgr = ModuleType::MODULE_CONTEXT;
454     pPrivateMsg->fromMgr = ModuleType::MODULE_MEDIACHANNEL;
455     pPrivateMsg->srcId = GetId();
456     pPrivateMsg->dstId = msg->srcId;
457     pPrivateMsg->agentId = GetSrcAgentId();
458     pPrivateMsg->prosumerId = GetId();
459     NotifyPrivateEvent(pPrivateMsg);
460 }
461 
462 REGISTER_CLASS_REFLECTOR(WfdRtpProducer);
463 } // namespace Sharing
464 } // namespace OHOS
465