• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22 
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 static std::mutex frameLock;
27 
RtpDecoderTs()28 RtpDecoderTs::RtpDecoderTs()
29 {
30     MEDIA_LOGD("RtpDecoderTs CTOR IN.");
31 }
32 
~RtpDecoderTs()33 RtpDecoderTs::~RtpDecoderTs()
34 {
35     SHARING_LOGI("RtpDecoderTs DTOR IN.");
36     Release();
37 }
38 
Release()39 void RtpDecoderTs::Release()
40 {
41     {
42         std::lock_guard<std::mutex> lock(frameLock);
43         onFrame_ = nullptr;
44     }
45 
46     exit_ = true;
47     if (decodeThread_ && decodeThread_->joinable()) {
48         decodeThread_->join();
49         decodeThread_ = nullptr;
50     }
51 
52     if (avFormatContext_) {
53         avformat_close_input(&avFormatContext_);
54     }
55 
56     if (avioContext_) {
57         av_freep(&avioContext_->buffer);
58         avio_context_free(&avioContext_);
59     }
60 }
61 
InputRtp(const RtpPacket::Ptr & rtp)62 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
63 {
64     MEDIA_LOGD("trace.");
65     RETURN_IF_NULL(rtp);
66     if (exit_) {
67         return;
68     }
69 
70     if (decodeThread_ == nullptr) {
71         decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
72     }
73 
74     auto payload_size = rtp->GetPayloadSize();
75     if (payload_size <= 0) {
76         return;
77     }
78 
79     std::lock_guard<std::mutex> lock(queueMutex_);
80     dataQueue_.emplace(rtp);
81 }
82 
SetOnFrame(const OnFrame & cb)83 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
84 {
85     onFrame_ = cb;
86 }
87 
StartDecoding()88 void RtpDecoderTs::StartDecoding()
89 {
90     SHARING_LOGE("trace.");
91     avFormatContext_ = avformat_alloc_context();
92     if (avFormatContext_ == nullptr) {
93         SHARING_LOGE("avformat_alloc_context failed.");
94         return;
95     }
96 
97     auto buffer = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
98     avioContext_ =
99         avio_alloc_context(buffer, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
100     if (avioContext_ == nullptr) {
101         SHARING_LOGE("avio_alloc_context failed.");
102         return;
103     }
104     avFormatContext_->pb = avioContext_;
105 
106     int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
107     if (ret != 0) {
108         SHARING_LOGE("avformat_open_input failed.");
109         return;
110     }
111 
112     for (int32_t i = 0; i < static_cast<int32_t>(avFormatContext_->nb_streams); i++) {
113         if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
114             videoStreamIndex_ = i;
115             SHARING_LOGD("find video stream %{public}u.", i);
116         } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
117             audioStreamIndex_ = i;
118             SHARING_LOGD("find audio stream %{public}u.", i);
119         }
120     }
121 
122     AVPacket *packet = av_packet_alloc();
123     while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
124         if (exit_) {
125             SHARING_LOGI("ignore av read frame.");
126             break;
127         }
128         if (packet->stream_index == videoStreamIndex_) {
129             SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
130                 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
131                     return;
132                 }
133                 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
134                                                             (uint32_t)packet->pts, prefix);
135                 std::lock_guard<std::mutex> lock(frameLock);
136                 if (onFrame_) {
137                     onFrame_(outFrame);
138                 }
139             });
140         } else if (packet->stream_index == audioStreamIndex_) {
141             auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
142                                                        (uint32_t)packet->pts);
143             std::lock_guard<std::mutex> lock(frameLock);
144             if (onFrame_) {
145                 onFrame_(outFrame);
146             }
147         }
148         av_packet_unref(packet);
149     }
150 
151     av_packet_free(&packet);
152     SHARING_LOGD("ts decoding Thread_ exit.");
153 }
154 
StaticReadPacket(void * opaque,uint8_t * buf,int buf_size)155 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
156 {
157     RETURN_INVALID_IF_NULL(opaque);
158     RETURN_INVALID_IF_NULL(buf);
159     RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
160     if (decoder == nullptr) {
161         SHARING_LOGE("decoder is nullptr.");
162         return 0;
163     }
164     return decoder->ReadPacket(buf, buf_size);
165 }
166 
ReadPacket(uint8_t * buf,int buf_size)167 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
168 {
169     RETURN_INVALID_IF_NULL(buf);
170     while (dataQueue_.empty()) {
171         if (exit_ == true) {
172             SHARING_LOGI("read packet exit.");
173             return 0;
174         }
175         std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
176     }
177 
178     std::unique_lock<std::mutex> lock(queueMutex_);
179     auto &rtp = dataQueue_.front();
180     auto data = rtp->GetPayload();
181     int length = static_cast<int>(rtp->GetPayloadSize());
182     if (length > buf_size) {
183         SHARING_LOGE("rtp length exceed buf_size!");
184         return 0;
185     }
186     auto ret = memcpy_s(buf, length, data, length);
187     if (ret != EOK) {
188         return 0;
189     }
190 
191     dataQueue_.pop();
192     return length;
193 }
194 
RtpEncoderTs(uint32_t ssrc,uint32_t mtuSize,uint32_t sampleRate,uint8_t payloadType,uint16_t seq)195 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
196     : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
197 {
198     SHARING_LOGI("RtpEncoderTs CTOR IN");
199     merger_.SetType(FrameMerger::H264_PREFIX);
200 }
201 
~RtpEncoderTs()202 RtpEncoderTs::~RtpEncoderTs()
203 {
204     SHARING_LOGI("RtpEncoderTs DTOR IN");
205     Release();
206 }
207 
Release()208 void RtpEncoderTs::Release()
209 {
210     SHARING_LOGD("trace.");
211     {
212         std::lock_guard<std::mutex> lock(cbLockMutex_);
213         onRtpPack_ = nullptr;
214     }
215     exit_ = true;
216     if (encodeThread_ && encodeThread_->joinable()) {
217         encodeThread_->join();
218         encodeThread_ = nullptr;
219     }
220 
221     {
222         std::lock_guard<std::mutex> lock(queueMutex_);
223         while (!dataQueue_.empty()) {
224             dataQueue_.pop();
225         }
226     }
227 
228     if (avioContext_) {
229         avio_context_free(&avioContext_);
230     }
231 
232     if (avioCtxBuffer_) {
233         av_free(avioCtxBuffer_);
234         avioCtxBuffer_ = nullptr;
235     }
236 
237     if (avFormatContext_) {
238         avformat_free_context(avFormatContext_);
239         avFormatContext_ = nullptr;
240     }
241 }
242 
InputFrame(const Frame::Ptr & frame)243 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
244 {
245     RETURN_IF_NULL(frame);
246     if (exit_) {
247         return;
248     }
249 
250     DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
251     switch (frame->GetCodecId()) {
252         case CODEC_H264:
253             // merge sps, pps and key frame into one packet
254             merger_.InputFrame(
255                 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
256                     auto outFrame =
257                         std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
258                                                     PrefixSize((char *)buffer->Data(), buffer->Size()));
259                     SaveFrame(outFrame);
260                 });
261             break;
262         case CODEC_AAC:
263         case CODEC_PCM:
264             SaveFrame(frame);
265             break;
266         default:
267             SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
268             break;
269     }
270 
271     if (audioCodeId_ == AV_CODEC_ID_NONE) {
272         if (frame->GetCodecId() == CODEC_AAC) {
273             audioCodeId_ = AV_CODEC_ID_AAC;
274         } else if (frame->GetCodecId() == CODEC_PCM) {
275             audioCodeId_ = AV_CODEC_ID_PCM_S16BE;
276         }
277     }
278     if (encodeThread_ == nullptr && audioCodeId_ != AV_CODEC_ID_NONE) {
279         encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
280     }
281 }
282 
SetOnRtpPack(const OnRtpPack & cb)283 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
284 {
285     SHARING_LOGD("trace.");
286     onRtpPack_ = cb;
287 }
288 
StartEncoding()289 void RtpEncoderTs::StartEncoding()
290 {
291     SHARING_LOGD("trace.");
292     avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
293     if (avFormatContext_ == nullptr) {
294         SHARING_LOGE("avformat_alloc_output_context2 failed.");
295         return;
296     }
297 
298     videoStream = avformat_new_stream(avFormatContext_, NULL);
299     videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
300     videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
301     videoStream->codecpar->codec_tag = 0;
302     videoStream->time_base.num = 1;
303     videoStream->time_base.den = SAMPLE_RATE_90K; // 90000: video sample rate
304 
305     audioStream = avformat_new_stream(avFormatContext_, NULL);
306     audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
307     audioStream->codecpar->codec_id = audioCodeId_;
308     audioStream->codecpar->codec_tag = 0;
309     audioStream->codecpar->channel_layout = (uint64_t)av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
310     audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
311     audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
312     audioStream->time_base.num = 1;
313     audioStream->time_base.den = AUDIO_SAMPLE_RATE_48000;
314     SHARING_LOGI("audio stream id: %{public}d, video stream id: %{public}d, audio codecid: %{public}d.",
315         audioStream->index, videoStream->index, audioCodeId_);
316 
317     avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
318     avioContext_ =
319         avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
320     if (avioContext_ == nullptr) {
321         SHARING_LOGE("avio_alloc_context failed.");
322         return;
323     }
324     avFormatContext_->pb = avioContext_;
325     avFormatContext_->flags = (int32_t)((uint32_t)(avFormatContext_->flags) | AVFMT_FLAG_CUSTOM_IO);
326 
327     int32_t ret = avformat_write_header(avFormatContext_, NULL);
328     if (ret < 0) {
329         SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
330         return;
331     }
332 
333     while (!exit_) {
334         AVPacket *packet = av_packet_alloc();
335         auto frame = ReadFrame(packet);
336         if (frame == nullptr) {
337             break;
338         }
339         av_write_frame(avFormatContext_, packet);
340     }
341 
342     av_write_trailer(avFormatContext_);
343 }
344 
SaveFrame(Frame::Ptr frame)345 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
346 {
347     RETURN_IF_NULL(frame);
348     std::lock_guard<std::mutex> lock(queueMutex_);
349     dataQueue_.emplace(frame);
350 }
351 
ReadFrame(AVPacket * packet)352 Frame::Ptr RtpEncoderTs::ReadFrame(AVPacket *packet)
353 {
354     if (packet == nullptr) {
355         SHARING_LOGE("packet is null!");
356         return nullptr;
357     }
358     while (dataQueue_.empty()) {
359         if (exit_ == true) {
360             SHARING_LOGI("exit when read frame.");
361             return nullptr;
362         }
363         std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait time
364     }
365 
366     std::unique_lock<std::mutex> lock(queueMutex_);
367     Frame::Ptr frame = dataQueue_.front();
368     dataQueue_.pop();
369     packet->data = frame->Data();
370     packet->size = frame->Size();
371 
372     keyFrame_ = frame->KeyFrame();
373     if (frame->GetTrackType() == TRACK_VIDEO) {
374         packet->dts = av_rescale(frame->Dts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
375         packet->pts = av_rescale(frame->Pts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
376         packet->stream_index = videoStream->index;
377         timeStamp_ = frame->Dts();
378     } else if (frame->GetTrackType() == TRACK_AUDIO) {
379         packet->dts = av_rescale(frame->Dts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
380         packet->pts = av_rescale(frame->Pts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
381         packet->stream_index = audioStream->index;
382         timeStamp_ = frame->Dts();
383     }
384     return frame;
385 }
386 
RemoveFrameAfterMuxing()387 void RtpEncoderTs::RemoveFrameAfterMuxing()
388 {
389     std::unique_lock<std::mutex> lock(queueMutex_);
390     dataQueue_.pop();
391 }
392 
WritePacket(void * opaque,uint8_t * buf,int buf_size)393 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
394 {
395     RETURN_INVALID_IF_NULL(opaque);
396     RETURN_INVALID_IF_NULL(buf);
397 
398     RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
399     std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
400     if (encoder->onRtpPack_) {
401         auto rtp =
402             encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
403         encoder->onRtpPack_(rtp);
404     }
405 
406     return 0;
407 }
408 } // namespace Sharing
409 } // namespace OHOS
410