• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22 
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 static std::mutex frameLock;
27 
RtpDecoderTs()28 RtpDecoderTs::RtpDecoderTs()
29 {
30     MEDIA_LOGD("RtpDecoderTs CTOR IN.");
31 }
32 
~RtpDecoderTs()33 RtpDecoderTs::~RtpDecoderTs()
34 {
35     SHARING_LOGI("RtpDecoderTs DTOR IN.");
36     Release();
37 }
38 
Release()39 void RtpDecoderTs::Release()
40 {
41     {
42         std::lock_guard<std::mutex> lock(frameLock);
43         onFrame_ = nullptr;
44     }
45 
46     exit_ = true;
47     if (decodeThread_ && decodeThread_->joinable()) {
48         decodeThread_->join();
49         decodeThread_ = nullptr;
50     }
51 
52     if (avFormatContext_) {
53         avformat_close_input(&avFormatContext_);
54     }
55 
56     if (avioContext_) {
57         av_freep(&avioContext_->buffer);
58         avio_context_free(&avioContext_);
59     }
60 }
61 
InputRtp(const RtpPacket::Ptr & rtp)62 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
63 {
64     MEDIA_LOGD("trace.");
65     RETURN_IF_NULL(rtp);
66     if (exit_) {
67         return;
68     }
69 
70     if (decodeThread_ == nullptr) {
71         decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
72     }
73 
74     auto payload_size = rtp->GetPayloadSize();
75     if (payload_size <= 0) {
76         return;
77     }
78 
79     std::lock_guard<std::mutex> lock(queueMutex_);
80     dataQueue_.emplace(rtp);
81 }
82 
SetOnFrame(const OnFrame & cb)83 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
84 {
85     onFrame_ = cb;
86 }
87 
StartDecoding()88 void RtpDecoderTs::StartDecoding()
89 {
90     SHARING_LOGE("trace.");
91     avFormatContext_ = avformat_alloc_context();
92     if (avFormatContext_ == nullptr) {
93         SHARING_LOGE("avformat_alloc_context failed.");
94         return;
95     }
96 
97     auto buffer = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
98     if (buffer == nullptr) {
99         SHARING_LOGE("av_malloc failed.");
100         return;
101     }
102     avioContext_ =
103         avio_alloc_context(buffer, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
104     if (avioContext_ == nullptr) {
105         SHARING_LOGE("avio_alloc_context failed.");
106         av_freep(&buffer);
107         return;
108     }
109     avFormatContext_->pb = avioContext_;
110 
111     int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
112     if (ret != 0) {
113         SHARING_LOGE("avformat_open_input failed.");
114         return;
115     }
116 
117     AVRational videoTimeBase;
118     AVRational audioTimeBase;
119     AVRational PtsTimeBase = {1, US_PER_SEC};
120 
121     for (int32_t i = 0; i < static_cast<int32_t>(avFormatContext_->nb_streams); i++) {
122         if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
123             videoTimeBase = avFormatContext_->streams[i]->time_base;
124             if (videoTimeBase.den == 0) {
125                 videoTimeBase = AV_TIME_BASE_Q;
126             }
127             videoStreamIndex_ = i;
128             SHARING_LOGD("find video stream %{public}u.", i);
129         } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
130             audioTimeBase = avFormatContext_->streams[i]->time_base;
131             if (audioTimeBase.den == 0) {
132                 audioTimeBase = AV_TIME_BASE_Q;
133             }
134             audioStreamIndex_ = i;
135             SHARING_LOGD("find audio stream %{public}u.", i);
136         }
137     }
138 
139     AVPacket *packet = av_packet_alloc();
140     if (packet == nullptr) {
141         av_freep(&buffer);
142         return;
143     }
144     while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
145         if (exit_) {
146             SHARING_LOGI("ignore av read frame.");
147             break;
148         }
149         if (packet->stream_index == videoStreamIndex_) {
150             SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
151                 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
152                     return;
153                 }
154                 int64_t ptsUsec = av_rescale_q(packet->pts, videoTimeBase, PtsTimeBase);
155                 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
156                                                             (uint64_t)ptsUsec, prefix);
157                 std::lock_guard<std::mutex> lock(frameLock);
158                 if (onFrame_) {
159                     onFrame_(outFrame);
160                 }
161             });
162         } else if (packet->stream_index == audioStreamIndex_) {
163             int64_t ptsUsec = av_rescale_q(packet->pts, audioTimeBase, PtsTimeBase);
164             auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
165                                                         (uint64_t)ptsUsec);
166             std::lock_guard<std::mutex> lock(frameLock);
167             if (onFrame_) {
168                 onFrame_(outFrame);
169             }
170         }
171         av_packet_unref(packet);
172     }
173 
174     av_packet_free(&packet);
175     SHARING_LOGD("ts decoding Thread_ exit.");
176 }
177 
StaticReadPacket(void * opaque,uint8_t * buf,int buf_size)178 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
179 {
180     RETURN_INVALID_IF_NULL(opaque);
181     RETURN_INVALID_IF_NULL(buf);
182     RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
183     if (decoder == nullptr) {
184         SHARING_LOGE("decoder is nullptr.");
185         return 0;
186     }
187     return decoder->ReadPacket(buf, buf_size);
188 }
189 
ReadPacket(uint8_t * buf,int buf_size)190 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
191 {
192     RETURN_INVALID_IF_NULL(buf);
193     while (dataQueue_.empty()) {
194         if (exit_ == true) {
195             SHARING_LOGI("read packet exit.");
196             return 0;
197         }
198         std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
199     }
200 
201     std::unique_lock<std::mutex> lock(queueMutex_);
202     auto &rtp = dataQueue_.front();
203     auto data = rtp->GetPayload();
204     if (data == nullptr) {
205         SHARING_LOGE("payload null");
206         return 0;
207     }
208     int length = static_cast<int>(rtp->GetPayloadSize());
209     if (length > buf_size) {
210         SHARING_LOGE("rtp length exceed buf_size!");
211         return 0;
212     }
213     auto ret = memcpy_s(buf, length, data, length);
214     if (ret != EOK) {
215         return 0;
216     }
217 
218     dataQueue_.pop();
219     return length;
220 }
221 
RtpEncoderTs(uint32_t ssrc,uint32_t mtuSize,uint32_t sampleRate,uint8_t payloadType,uint16_t seq)222 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
223     : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
224 {
225     SHARING_LOGI("RtpEncoderTs CTOR IN");
226     merger_.SetType(FrameMerger::H264_PREFIX);
227 }
228 
~RtpEncoderTs()229 RtpEncoderTs::~RtpEncoderTs()
230 {
231     SHARING_LOGI("RtpEncoderTs DTOR IN");
232     Release();
233 }
234 
Release()235 void RtpEncoderTs::Release()
236 {
237     SHARING_LOGD("trace.");
238     {
239         std::lock_guard<std::mutex> lock(cbLockMutex_);
240         onRtpPack_ = nullptr;
241     }
242     exit_ = true;
243     if (encodeThread_ && encodeThread_->joinable()) {
244         encodeThread_->join();
245         encodeThread_ = nullptr;
246     }
247 
248     {
249         std::lock_guard<std::mutex> lock(queueMutex_);
250         while (!dataQueue_.empty()) {
251             dataQueue_.pop();
252         }
253     }
254 
255     if (avioContext_) {
256         avio_context_free(&avioContext_);
257     }
258 
259     if (avioCtxBuffer_) {
260         av_free(avioCtxBuffer_);
261         avioCtxBuffer_ = nullptr;
262     }
263 
264     if (avFormatContext_) {
265         avformat_free_context(avFormatContext_);
266         avFormatContext_ = nullptr;
267     }
268 }
269 
InputFrame(const Frame::Ptr & frame)270 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
271 {
272     RETURN_IF_NULL(frame);
273     if (exit_) {
274         return;
275     }
276 
277     DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
278     switch (frame->GetCodecId()) {
279         case CODEC_H264:
280             // merge sps, pps and key frame into one packet
281             merger_.InputFrame(
282                 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
283                     auto outFrame =
284                         std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
285                                                     PrefixSize((char *)buffer->Data(), buffer->Size()));
286                     SaveFrame(outFrame);
287                 });
288             break;
289         case CODEC_AAC:
290         case CODEC_PCM:
291             SaveFrame(frame);
292             break;
293         default:
294             SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
295             break;
296     }
297 
298     if (audioCodeId_ == AV_CODEC_ID_NONE) {
299         if (frame->GetCodecId() == CODEC_AAC) {
300             audioCodeId_ = AV_CODEC_ID_AAC;
301         } else if (frame->GetCodecId() == CODEC_PCM) {
302             audioCodeId_ = AV_CODEC_ID_PCM_S16BE;
303         }
304     }
305     if (encodeThread_ == nullptr && audioCodeId_ != AV_CODEC_ID_NONE) {
306         encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
307     }
308 }
309 
SetOnRtpPack(const OnRtpPack & cb)310 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
311 {
312     SHARING_LOGD("trace.");
313     onRtpPack_ = cb;
314 }
315 
StartEncoding()316 void RtpEncoderTs::StartEncoding()
317 {
318     SHARING_LOGD("trace.");
319     avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
320     if (avFormatContext_ == nullptr) {
321         SHARING_LOGE("avformat_alloc_output_context2 failed.");
322         return;
323     }
324 
325     videoStream = avformat_new_stream(avFormatContext_, NULL);
326     videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
327     videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
328     videoStream->codecpar->codec_tag = 0;
329     videoStream->time_base.num = 1;
330     videoStream->time_base.den = SAMPLE_RATE_90K; // 90000: video sample rate
331 
332     audioStream = avformat_new_stream(avFormatContext_, NULL);
333     audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
334     audioStream->codecpar->codec_id = audioCodeId_;
335     audioStream->codecpar->codec_tag = 0;
336     audioStream->codecpar->channel_layout = (uint64_t)av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
337     audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
338     audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
339     audioStream->time_base.num = 1;
340     audioStream->time_base.den = AUDIO_SAMPLE_RATE_48000;
341     SHARING_LOGI("audio stream id: %{public}d, video stream id: %{public}d, audio codecid: %{public}d.",
342         audioStream->index, videoStream->index, audioCodeId_);
343 
344     avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
345     avioContext_ =
346         avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
347     if (avioContext_ == nullptr) {
348         SHARING_LOGE("avio_alloc_context failed.");
349         return;
350     }
351     avFormatContext_->pb = avioContext_;
352     avFormatContext_->flags = (int32_t)((uint32_t)(avFormatContext_->flags) | AVFMT_FLAG_CUSTOM_IO);
353 
354     int32_t ret = avformat_write_header(avFormatContext_, NULL);
355     if (ret < 0) {
356         SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
357         return;
358     }
359 
360     while (!exit_) {
361         AVPacket *packet = av_packet_alloc();
362         auto frame = ReadFrame(packet);
363         if (frame == nullptr) {
364             break;
365         }
366         av_write_frame(avFormatContext_, packet);
367     }
368 
369     av_write_trailer(avFormatContext_);
370 }
371 
SaveFrame(Frame::Ptr frame)372 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
373 {
374     RETURN_IF_NULL(frame);
375     std::lock_guard<std::mutex> lock(queueMutex_);
376     dataQueue_.emplace(frame);
377 }
378 
ReadFrame(AVPacket * packet)379 Frame::Ptr RtpEncoderTs::ReadFrame(AVPacket *packet)
380 {
381     if (packet == nullptr) {
382         SHARING_LOGE("packet is null!");
383         return nullptr;
384     }
385     while (dataQueue_.empty()) {
386         if (exit_ == true) {
387             SHARING_LOGI("exit when read frame.");
388             return nullptr;
389         }
390         std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait time
391     }
392 
393     std::unique_lock<std::mutex> lock(queueMutex_);
394     Frame::Ptr frame = dataQueue_.front();
395     dataQueue_.pop();
396     packet->data = frame->Data();
397     packet->size = frame->Size();
398 
399     keyFrame_ = frame->KeyFrame();
400     if (frame->GetTrackType() == TRACK_VIDEO) {
401         packet->dts = av_rescale(frame->Dts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
402         packet->pts = av_rescale(frame->Pts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
403         packet->stream_index = videoStream->index;
404         timeStamp_ = frame->Dts();
405     } else if (frame->GetTrackType() == TRACK_AUDIO) {
406         packet->dts = av_rescale(frame->Dts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
407         packet->pts = av_rescale(frame->Pts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
408         packet->stream_index = audioStream->index;
409         timeStamp_ = frame->Dts();
410     }
411     return frame;
412 }
413 
RemoveFrameAfterMuxing()414 void RtpEncoderTs::RemoveFrameAfterMuxing()
415 {
416     std::unique_lock<std::mutex> lock(queueMutex_);
417     dataQueue_.pop();
418 }
419 
WritePacket(void * opaque,uint8_t * buf,int buf_size)420 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
421 {
422     RETURN_INVALID_IF_NULL(opaque);
423     RETURN_INVALID_IF_NULL(buf);
424 
425     RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
426     std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
427     if (encoder->onRtpPack_) {
428         auto rtp =
429             encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
430         encoder->onRtpPack_(rtp);
431     }
432 
433     return 0;
434 }
435 } // namespace Sharing
436 } // namespace OHOS
437