1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 constexpr int32_t MAX_RTP_PAYLOAD_SIZE = 1316;
27 static std::mutex frameLock;
28
RtpDecoderTs()29 RtpDecoderTs::RtpDecoderTs()
30 {
31 MEDIA_LOGD("RtpDecoderTs CTOR IN.");
32 }
33
~RtpDecoderTs()34 RtpDecoderTs::~RtpDecoderTs()
35 {
36 SHARING_LOGI("RtpDecoderTs DTOR IN.");
37 Release();
38 }
39
Release()40 void RtpDecoderTs::Release()
41 {
42 {
43 std::lock_guard<std::mutex> lock(frameLock);
44 onFrame_ = nullptr;
45 }
46
47 exit_ = true;
48 if (decodeThread_ && decodeThread_->joinable()) {
49 decodeThread_->join();
50 decodeThread_ = nullptr;
51 }
52
53 if (avFormatContext_) {
54 avformat_close_input(&avFormatContext_);
55 }
56
57 if (avioContext_) {
58 avio_context_free(&avioContext_);
59 }
60
61 if (avioCtxBuffer_) {
62 av_free(avioCtxBuffer_);
63 avioCtxBuffer_ = nullptr;
64 }
65 }
66
InputRtp(const RtpPacket::Ptr & rtp)67 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
68 {
69 MEDIA_LOGD("trace.");
70 RETURN_IF_NULL(rtp);
71 if (exit_) {
72 return;
73 }
74
75 if (decodeThread_ == nullptr) {
76 decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
77 }
78
79 auto payload_size = rtp->GetPayloadSize();
80 if (payload_size <= 0) {
81 return;
82 }
83
84 std::lock_guard<std::mutex> lock(queueMutex_);
85 dataQueue_.emplace(rtp);
86 }
87
SetOnFrame(const OnFrame & cb)88 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
89 {
90 onFrame_ = cb;
91 }
92
StartDecoding()93 void RtpDecoderTs::StartDecoding()
94 {
95 SHARING_LOGE("trace.");
96 avFormatContext_ = avformat_alloc_context();
97 if (avFormatContext_ == nullptr) {
98 SHARING_LOGE("avformat_alloc_context failed.");
99 return;
100 }
101
102 avioCtxBuffer_ = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
103
104 avioContext_ =
105 avio_alloc_context(avioCtxBuffer_, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
106 if (avioContext_ == nullptr) {
107 SHARING_LOGE("avio_alloc_context failed.");
108 return;
109 }
110 avFormatContext_->pb = avioContext_;
111
112 int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
113 if (ret != 0) {
114 SHARING_LOGE("avformat_open_input failed.");
115 return;
116 }
117
118 for (uint32_t i = 0; i < avFormatContext_->nb_streams; i++) {
119 if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
120 videoStreamIndex_ = i;
121 SHARING_LOGD("find video stream %{public}u.", i);
122 } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
123 audioStreamIndex_ = i;
124 SHARING_LOGD("find audio stream %{public}u.", i);
125 }
126 }
127
128 AVPacket *packet = av_packet_alloc();
129 while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
130 if (exit_) {
131 SHARING_LOGI("ignore av read frame.");
132 break;
133 }
134 if (packet->stream_index == videoStreamIndex_) {
135 SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
136 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
137 return;
138 }
139 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
140 (uint32_t)packet->pts, prefix);
141 std::lock_guard<std::mutex> lock(frameLock);
142 if (onFrame_) {
143 onFrame_(outFrame);
144 }
145 });
146 } else if (packet->stream_index == audioStreamIndex_) {
147 auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
148 (uint32_t)packet->pts);
149 std::lock_guard<std::mutex> lock(frameLock);
150 if (onFrame_) {
151 onFrame_(outFrame);
152 }
153 }
154 av_packet_unref(packet);
155 }
156
157 av_packet_free(&packet);
158 SHARING_LOGD("ts decoding Thread_ exit.");
159 }
160
StaticReadPacket(void * opaque,uint8_t * buf,int buf_size)161 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
162 {
163 RETURN_INVALID_IF_NULL(opaque);
164 RETURN_INVALID_IF_NULL(buf);
165 RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
166 if (decoder == nullptr) {
167 SHARING_LOGE("decoder is nullptr.");
168 return 0;
169 }
170 return decoder->ReadPacket(buf, buf_size);
171 }
172
ReadPacket(uint8_t * buf,int buf_size)173 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
174 {
175 RETURN_INVALID_IF_NULL(buf);
176 while (dataQueue_.empty()) {
177 if (exit_ == true) {
178 SHARING_LOGI("read packet exit.");
179 return 0;
180 }
181 std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
182 }
183
184 std::unique_lock<std::mutex> lock(queueMutex_);
185 auto &rtp = dataQueue_.front();
186 auto data = rtp->GetPayload();
187 int length = rtp->GetPayloadSize();
188
189 auto ret = memcpy_s(buf, length, data, length);
190 if (ret != EOK) {
191 return 0;
192 }
193
194 dataQueue_.pop();
195 return length;
196 }
197
RtpEncoderTs(uint32_t ssrc,uint32_t mtuSize,uint32_t sampleRate,uint8_t payloadType,uint16_t seq)198 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
199 : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
200 {
201 SHARING_LOGD("RtpEncoderTs CTOR IN");
202 merger_.SetType(FrameMerger::H264_PREFIX);
203 }
204
~RtpEncoderTs()205 RtpEncoderTs::~RtpEncoderTs()
206 {
207 SHARING_LOGD("RtpEncoderTs DTOR IN");
208 Release();
209 }
210
Release()211 void RtpEncoderTs::Release()
212 {
213 SHARING_LOGD("trace.");
214 {
215 std::lock_guard<std::mutex> lock(cbLockMutex_);
216 onRtpPack_ = nullptr;
217 }
218 exit_ = true;
219 if (encodeThread_ && encodeThread_->joinable()) {
220 encodeThread_->join();
221 encodeThread_ = nullptr;
222 }
223
224 {
225 std::lock_guard<std::mutex> lock(queueMutex_);
226 while (!dataQueue_.empty()) {
227 dataQueue_.pop();
228 }
229 }
230
231 if (avioContext_) {
232 avio_context_free(&avioContext_);
233 }
234
235 if (avioCtxBuffer_) {
236 av_free(avioCtxBuffer_);
237 avioCtxBuffer_ = nullptr;
238 }
239
240 if (avFormatContext_) {
241 avformat_free_context(avFormatContext_);
242 avFormatContext_ = nullptr;
243 }
244 }
245
InputFrame(const Frame::Ptr & frame)246 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
247 {
248 RETURN_IF_NULL(frame);
249 if (exit_) {
250 return;
251 }
252
253 DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
254 switch (frame->GetCodecId()) {
255 case CODEC_H264:
256 // merge sps, pps and key frame into one packet
257 merger_.InputFrame(
258 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
259 auto outFrame =
260 std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
261 PrefixSize((char *)buffer->Data(), buffer->Size()));
262 SaveFrame(outFrame);
263 });
264 break;
265 case CODEC_AAC:
266 SaveFrame(frame);
267 break;
268 default:
269 SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
270 break;
271 }
272
273 if (encodeThread_ == nullptr) {
274 encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
275 }
276 }
277
SetOnRtpPack(const OnRtpPack & cb)278 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
279 {
280 SHARING_LOGD("trace.");
281 onRtpPack_ = cb;
282 }
283
StartEncoding()284 void RtpEncoderTs::StartEncoding()
285 {
286 SHARING_LOGD("trace.");
287 avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
288 if (avFormatContext_ == nullptr) {
289 SHARING_LOGE("avformat_alloc_output_context2 failed.");
290 return;
291 }
292
293 videoStream = avformat_new_stream(avFormatContext_, NULL);
294 videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
295 videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
296 videoStream->codecpar->codec_tag = 0;
297
298 audioStream = avformat_new_stream(avFormatContext_, NULL);
299 audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
300 audioStream->codecpar->codec_id = AV_CODEC_ID_AAC;
301 audioStream->codecpar->codec_tag = 0;
302 audioStream->codecpar->channel_layout = av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
303 audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
304 audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
305
306 SHARING_LOGD("audio stream id: %{public}d, video stream id: %{public}d.",
307 audioStream->index, videoStream->index);
308
309 avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
310 avioContext_ =
311 avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
312 if (avioContext_ == nullptr) {
313 SHARING_LOGE("avio_alloc_context failed.");
314 return;
315 }
316 avFormatContext_->pb = avioContext_;
317 avFormatContext_->flags |= AVFMT_FLAG_CUSTOM_IO;
318
319 int32_t ret = avformat_write_header(avFormatContext_, NULL);
320 if (ret < 0) {
321 SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
322 return;
323 }
324
325 while (!exit_) {
326 AVPacket *packet = av_packet_alloc();
327 if (ReadFrame(packet) < 0) {
328 break;
329 }
330 av_write_frame(avFormatContext_, packet);
331 RemoveFrameAfterMuxing();
332 }
333
334 av_write_trailer(avFormatContext_);
335 }
336
SaveFrame(Frame::Ptr frame)337 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
338 {
339 RETURN_IF_NULL(frame);
340 std::lock_guard<std::mutex> lock(queueMutex_);
341 dataQueue_.emplace(frame);
342 }
343
ReadFrame(AVPacket * packet)344 int RtpEncoderTs::ReadFrame(AVPacket *packet)
345 {
346 RETURN_INVALID_IF_NULL(packet);
347 while (dataQueue_.empty()) {
348 if (exit_ == true) {
349 SHARING_LOGI("exit when read frame.");
350 return -1;
351 }
352 std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait times
353 }
354
355 std::unique_lock<std::mutex> lock(queueMutex_);
356 Frame::Ptr frame = dataQueue_.front();
357 packet->data = frame->Data();
358 packet->size = frame->Size();
359 packet->pts = frame->Pts();
360 packet->dts = frame->Dts();
361 keyFrame_ = frame->KeyFrame();
362 timeStamp_ = frame->Dts();
363
364 if (frame->GetTrackType() == TRACK_VIDEO) {
365 packet->stream_index = videoStream->index;
366 } else if (frame->GetTrackType() == TRACK_AUDIO) {
367 packet->stream_index = audioStream->index;
368 }
369 return 0;
370 }
371
RemoveFrameAfterMuxing()372 void RtpEncoderTs::RemoveFrameAfterMuxing()
373 {
374 std::unique_lock<std::mutex> lock(queueMutex_);
375 dataQueue_.pop();
376 }
377
WritePacket(void * opaque,uint8_t * buf,int buf_size)378 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
379 {
380 RETURN_INVALID_IF_NULL(opaque);
381 RETURN_INVALID_IF_NULL(buf);
382 RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
383 std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
384 if (encoder->onRtpPack_) {
385 auto rtp =
386 encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
387 encoder->onRtpPack_(rtp);
388 }
389
390 return 0;
391 }
392 } // namespace Sharing
393 } // namespace OHOS
394