• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22 
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 constexpr int32_t MAX_RTP_PAYLOAD_SIZE = 1316;
27 static std::mutex frameLock;
28 
RtpDecoderTs()29 RtpDecoderTs::RtpDecoderTs()
30 {
31     MEDIA_LOGD("RtpDecoderTs CTOR IN.");
32 }
33 
~RtpDecoderTs()34 RtpDecoderTs::~RtpDecoderTs()
35 {
36     SHARING_LOGI("RtpDecoderTs DTOR IN.");
37     Release();
38 }
39 
Release()40 void RtpDecoderTs::Release()
41 {
42     {
43         std::lock_guard<std::mutex> lock(frameLock);
44         onFrame_ = nullptr;
45     }
46 
47     exit_ = true;
48     if (decodeThread_ && decodeThread_->joinable()) {
49         decodeThread_->join();
50         decodeThread_ = nullptr;
51     }
52 
53     if (avFormatContext_) {
54         avformat_close_input(&avFormatContext_);
55     }
56 
57     if (avioContext_) {
58         avio_context_free(&avioContext_);
59     }
60 
61     if (avioCtxBuffer_) {
62         av_free(avioCtxBuffer_);
63         avioCtxBuffer_ = nullptr;
64     }
65 }
66 
InputRtp(const RtpPacket::Ptr & rtp)67 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
68 {
69     MEDIA_LOGD("trace.");
70     RETURN_IF_NULL(rtp);
71     if (exit_) {
72         return;
73     }
74 
75     if (decodeThread_ == nullptr) {
76         decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
77     }
78 
79     auto payload_size = rtp->GetPayloadSize();
80     if (payload_size <= 0) {
81         return;
82     }
83 
84     std::lock_guard<std::mutex> lock(queueMutex_);
85     dataQueue_.emplace(rtp);
86 }
87 
SetOnFrame(const OnFrame & cb)88 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
89 {
90     onFrame_ = cb;
91 }
92 
StartDecoding()93 void RtpDecoderTs::StartDecoding()
94 {
95     SHARING_LOGE("trace.");
96     avFormatContext_ = avformat_alloc_context();
97     if (avFormatContext_ == nullptr) {
98         SHARING_LOGE("avformat_alloc_context failed.");
99         return;
100     }
101 
102     avioCtxBuffer_ = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
103 
104     avioContext_ =
105         avio_alloc_context(avioCtxBuffer_, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
106     if (avioContext_ == nullptr) {
107         SHARING_LOGE("avio_alloc_context failed.");
108         return;
109     }
110     avFormatContext_->pb = avioContext_;
111 
112     int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
113     if (ret != 0) {
114         SHARING_LOGE("avformat_open_input failed.");
115         return;
116     }
117 
118     for (uint32_t i = 0; i < avFormatContext_->nb_streams; i++) {
119         if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
120             videoStreamIndex_ = i;
121             SHARING_LOGD("find video stream %{public}u.", i);
122         } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
123             audioStreamIndex_ = i;
124             SHARING_LOGD("find audio stream %{public}u.", i);
125         }
126     }
127 
128     AVPacket *packet = av_packet_alloc();
129     while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
130         if (exit_) {
131             SHARING_LOGI("ignore av read frame.");
132             break;
133         }
134         if (packet->stream_index == videoStreamIndex_) {
135             SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
136                 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
137                     return;
138                 }
139                 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
140                                                             (uint32_t)packet->pts, prefix);
141                 std::lock_guard<std::mutex> lock(frameLock);
142                 if (onFrame_) {
143                     onFrame_(outFrame);
144                 }
145             });
146         } else if (packet->stream_index == audioStreamIndex_) {
147             auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
148                                                        (uint32_t)packet->pts);
149             std::lock_guard<std::mutex> lock(frameLock);
150             if (onFrame_) {
151                 onFrame_(outFrame);
152             }
153         }
154         av_packet_unref(packet);
155     }
156 
157     av_packet_free(&packet);
158     SHARING_LOGD("ts decoding Thread_ exit.");
159 }
160 
StaticReadPacket(void * opaque,uint8_t * buf,int buf_size)161 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
162 {
163     RETURN_INVALID_IF_NULL(opaque);
164     RETURN_INVALID_IF_NULL(buf);
165     RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
166     if (decoder == nullptr) {
167         SHARING_LOGE("decoder is nullptr.");
168         return 0;
169     }
170     return decoder->ReadPacket(buf, buf_size);
171 }
172 
ReadPacket(uint8_t * buf,int buf_size)173 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
174 {
175     RETURN_INVALID_IF_NULL(buf);
176     while (dataQueue_.empty()) {
177         if (exit_ == true) {
178             SHARING_LOGI("read packet exit.");
179             return 0;
180         }
181         std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
182     }
183 
184     std::unique_lock<std::mutex> lock(queueMutex_);
185     auto &rtp = dataQueue_.front();
186     auto data = rtp->GetPayload();
187     int length = rtp->GetPayloadSize();
188 
189     auto ret = memcpy_s(buf, length, data, length);
190     if (ret != EOK) {
191         return 0;
192     }
193 
194     dataQueue_.pop();
195     return length;
196 }
197 
RtpEncoderTs(uint32_t ssrc,uint32_t mtuSize,uint32_t sampleRate,uint8_t payloadType,uint16_t seq)198 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
199     : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
200 {
201     SHARING_LOGD("RtpEncoderTs CTOR IN");
202     merger_.SetType(FrameMerger::H264_PREFIX);
203 }
204 
~RtpEncoderTs()205 RtpEncoderTs::~RtpEncoderTs()
206 {
207     SHARING_LOGD("RtpEncoderTs DTOR IN");
208     Release();
209 }
210 
Release()211 void RtpEncoderTs::Release()
212 {
213     SHARING_LOGD("trace.");
214     {
215         std::lock_guard<std::mutex> lock(cbLockMutex_);
216         onRtpPack_ = nullptr;
217     }
218     exit_ = true;
219     if (encodeThread_ && encodeThread_->joinable()) {
220         encodeThread_->join();
221         encodeThread_ = nullptr;
222     }
223 
224     {
225         std::lock_guard<std::mutex> lock(queueMutex_);
226         while (!dataQueue_.empty()) {
227             dataQueue_.pop();
228         }
229     }
230 
231     if (avioContext_) {
232         avio_context_free(&avioContext_);
233     }
234 
235     if (avioCtxBuffer_) {
236         av_free(avioCtxBuffer_);
237         avioCtxBuffer_ = nullptr;
238     }
239 
240     if (avFormatContext_) {
241         avformat_free_context(avFormatContext_);
242         avFormatContext_ = nullptr;
243     }
244 }
245 
InputFrame(const Frame::Ptr & frame)246 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
247 {
248     RETURN_IF_NULL(frame);
249     if (exit_) {
250         return;
251     }
252 
253     DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
254     switch (frame->GetCodecId()) {
255         case CODEC_H264:
256             // merge sps, pps and key frame into one packet
257             merger_.InputFrame(
258                 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
259                     auto outFrame =
260                         std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
261                                                     PrefixSize((char *)buffer->Data(), buffer->Size()));
262                     SaveFrame(outFrame);
263                 });
264             break;
265         case CODEC_AAC:
266             SaveFrame(frame);
267             break;
268         default:
269             SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
270             break;
271     }
272 
273     if (encodeThread_ == nullptr) {
274         encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
275     }
276 }
277 
SetOnRtpPack(const OnRtpPack & cb)278 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
279 {
280     SHARING_LOGD("trace.");
281     onRtpPack_ = cb;
282 }
283 
StartEncoding()284 void RtpEncoderTs::StartEncoding()
285 {
286     SHARING_LOGD("trace.");
287     avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
288     if (avFormatContext_ == nullptr) {
289         SHARING_LOGE("avformat_alloc_output_context2 failed.");
290         return;
291     }
292 
293     videoStream = avformat_new_stream(avFormatContext_, NULL);
294     videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
295     videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
296     videoStream->codecpar->codec_tag = 0;
297 
298     audioStream = avformat_new_stream(avFormatContext_, NULL);
299     audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
300     audioStream->codecpar->codec_id = AV_CODEC_ID_AAC;
301     audioStream->codecpar->codec_tag = 0;
302     audioStream->codecpar->channel_layout = av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
303     audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
304     audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
305 
306     SHARING_LOGD("audio stream id: %{public}d, video stream id: %{public}d.",
307         audioStream->index, videoStream->index);
308 
309     avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
310     avioContext_ =
311         avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
312     if (avioContext_ == nullptr) {
313         SHARING_LOGE("avio_alloc_context failed.");
314         return;
315     }
316     avFormatContext_->pb = avioContext_;
317     avFormatContext_->flags |= AVFMT_FLAG_CUSTOM_IO;
318 
319     int32_t ret = avformat_write_header(avFormatContext_, NULL);
320     if (ret < 0) {
321         SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
322         return;
323     }
324 
325     while (!exit_) {
326         AVPacket *packet = av_packet_alloc();
327         if (ReadFrame(packet) < 0) {
328             break;
329         }
330         av_write_frame(avFormatContext_, packet);
331         RemoveFrameAfterMuxing();
332     }
333 
334     av_write_trailer(avFormatContext_);
335 }
336 
SaveFrame(Frame::Ptr frame)337 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
338 {
339     RETURN_IF_NULL(frame);
340     std::lock_guard<std::mutex> lock(queueMutex_);
341     dataQueue_.emplace(frame);
342 }
343 
ReadFrame(AVPacket * packet)344 int RtpEncoderTs::ReadFrame(AVPacket *packet)
345 {
346     RETURN_INVALID_IF_NULL(packet);
347     while (dataQueue_.empty()) {
348         if (exit_ == true) {
349             SHARING_LOGI("exit when read frame.");
350             return -1;
351         }
352         std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait times
353     }
354 
355     std::unique_lock<std::mutex> lock(queueMutex_);
356     Frame::Ptr frame = dataQueue_.front();
357     packet->data = frame->Data();
358     packet->size = frame->Size();
359     packet->pts = frame->Pts();
360     packet->dts = frame->Dts();
361     keyFrame_ = frame->KeyFrame();
362     timeStamp_ = frame->Dts();
363 
364     if (frame->GetTrackType() == TRACK_VIDEO) {
365         packet->stream_index = videoStream->index;
366     } else if (frame->GetTrackType() == TRACK_AUDIO) {
367         packet->stream_index = audioStream->index;
368     }
369     return 0;
370 }
371 
RemoveFrameAfterMuxing()372 void RtpEncoderTs::RemoveFrameAfterMuxing()
373 {
374     std::unique_lock<std::mutex> lock(queueMutex_);
375     dataQueue_.pop();
376 }
377 
WritePacket(void * opaque,uint8_t * buf,int buf_size)378 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
379 {
380     RETURN_INVALID_IF_NULL(opaque);
381     RETURN_INVALID_IF_NULL(buf);
382     RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
383     std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
384     if (encoder->onRtpPack_) {
385         auto rtp =
386             encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
387         encoder->onRtpPack_(rtp);
388     }
389 
390     return 0;
391 }
392 } // namespace Sharing
393 } // namespace OHOS
394