1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 static std::mutex frameLock;
27
RtpDecoderTs()28 RtpDecoderTs::RtpDecoderTs()
29 {
30 MEDIA_LOGD("RtpDecoderTs CTOR IN.");
31 }
32
~RtpDecoderTs()33 RtpDecoderTs::~RtpDecoderTs()
34 {
35 SHARING_LOGI("RtpDecoderTs DTOR IN.");
36 Release();
37 }
38
Release()39 void RtpDecoderTs::Release()
40 {
41 {
42 std::lock_guard<std::mutex> lock(frameLock);
43 onFrame_ = nullptr;
44 }
45
46 exit_ = true;
47 if (decodeThread_ && decodeThread_->joinable()) {
48 decodeThread_->join();
49 decodeThread_ = nullptr;
50 }
51
52 if (avFormatContext_) {
53 avformat_close_input(&avFormatContext_);
54 }
55
56 if (avioContext_) {
57 av_freep(&avioContext_->buffer);
58 avio_context_free(&avioContext_);
59 }
60 }
61
InputRtp(const RtpPacket::Ptr & rtp)62 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
63 {
64 MEDIA_LOGD("trace.");
65 RETURN_IF_NULL(rtp);
66 if (exit_) {
67 return;
68 }
69
70 if (decodeThread_ == nullptr) {
71 decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
72 }
73
74 auto payload_size = rtp->GetPayloadSize();
75 if (payload_size <= 0) {
76 return;
77 }
78
79 std::lock_guard<std::mutex> lock(queueMutex_);
80 dataQueue_.emplace(rtp);
81 }
82
SetOnFrame(const OnFrame & cb)83 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
84 {
85 onFrame_ = cb;
86 }
87
StartDecoding()88 void RtpDecoderTs::StartDecoding()
89 {
90 SHARING_LOGE("trace.");
91 avFormatContext_ = avformat_alloc_context();
92 if (avFormatContext_ == nullptr) {
93 SHARING_LOGE("avformat_alloc_context failed.");
94 return;
95 }
96
97 auto buffer = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
98 if (buffer == nullptr) {
99 SHARING_LOGE("av_malloc failed.");
100 return;
101 }
102 avioContext_ =
103 avio_alloc_context(buffer, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
104 if (avioContext_ == nullptr) {
105 SHARING_LOGE("avio_alloc_context failed.");
106 av_freep(&buffer);
107 return;
108 }
109 avFormatContext_->pb = avioContext_;
110
111 int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
112 if (ret != 0) {
113 SHARING_LOGE("avformat_open_input failed.");
114 return;
115 }
116
117 AVRational videoTimeBase;
118 AVRational audioTimeBase;
119 AVRational PtsTimeBase = {1, US_PER_SEC};
120
121 for (int32_t i = 0; i < static_cast<int32_t>(avFormatContext_->nb_streams); i++) {
122 if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
123 videoTimeBase = avFormatContext_->streams[i]->time_base;
124 if (videoTimeBase.den == 0) {
125 videoTimeBase = AV_TIME_BASE_Q;
126 }
127 videoStreamIndex_ = i;
128 SHARING_LOGD("find video stream %{public}u.", i);
129 } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
130 audioTimeBase = avFormatContext_->streams[i]->time_base;
131 if (audioTimeBase.den == 0) {
132 audioTimeBase = AV_TIME_BASE_Q;
133 }
134 audioStreamIndex_ = i;
135 SHARING_LOGD("find audio stream %{public}u.", i);
136 }
137 }
138
139 AVPacket *packet = av_packet_alloc();
140 if (packet == nullptr) {
141 av_freep(&buffer);
142 return;
143 }
144 while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
145 if (exit_) {
146 SHARING_LOGI("ignore av read frame.");
147 break;
148 }
149 if (packet->stream_index == videoStreamIndex_) {
150 SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
151 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
152 return;
153 }
154 int64_t ptsUsec = av_rescale_q(packet->pts, videoTimeBase, PtsTimeBase);
155 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
156 (uint64_t)ptsUsec, prefix);
157 std::lock_guard<std::mutex> lock(frameLock);
158 if (onFrame_) {
159 onFrame_(outFrame);
160 }
161 });
162 } else if (packet->stream_index == audioStreamIndex_) {
163 int64_t ptsUsec = av_rescale_q(packet->pts, audioTimeBase, PtsTimeBase);
164 auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
165 (uint64_t)ptsUsec);
166 std::lock_guard<std::mutex> lock(frameLock);
167 if (onFrame_) {
168 onFrame_(outFrame);
169 }
170 }
171 av_packet_unref(packet);
172 }
173
174 av_packet_free(&packet);
175 SHARING_LOGD("ts decoding Thread_ exit.");
176 }
177
StaticReadPacket(void * opaque,uint8_t * buf,int buf_size)178 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
179 {
180 RETURN_INVALID_IF_NULL(opaque);
181 RETURN_INVALID_IF_NULL(buf);
182 RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
183 if (decoder == nullptr) {
184 SHARING_LOGE("decoder is nullptr.");
185 return 0;
186 }
187 return decoder->ReadPacket(buf, buf_size);
188 }
189
ReadPacket(uint8_t * buf,int buf_size)190 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
191 {
192 RETURN_INVALID_IF_NULL(buf);
193 while (dataQueue_.empty()) {
194 if (exit_ == true) {
195 SHARING_LOGI("read packet exit.");
196 return 0;
197 }
198 std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
199 }
200
201 std::unique_lock<std::mutex> lock(queueMutex_);
202 auto &rtp = dataQueue_.front();
203 auto data = rtp->GetPayload();
204 if (data == nullptr) {
205 SHARING_LOGE("payload null");
206 return 0;
207 }
208 int length = static_cast<int>(rtp->GetPayloadSize());
209 if (length > buf_size) {
210 SHARING_LOGE("rtp length exceed buf_size!");
211 return 0;
212 }
213 auto ret = memcpy_s(buf, length, data, length);
214 if (ret != EOK) {
215 return 0;
216 }
217
218 dataQueue_.pop();
219 return length;
220 }
221
RtpEncoderTs(uint32_t ssrc,uint32_t mtuSize,uint32_t sampleRate,uint8_t payloadType,uint16_t seq)222 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
223 : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
224 {
225 SHARING_LOGI("RtpEncoderTs CTOR IN");
226 merger_.SetType(FrameMerger::H264_PREFIX);
227 }
228
~RtpEncoderTs()229 RtpEncoderTs::~RtpEncoderTs()
230 {
231 SHARING_LOGI("RtpEncoderTs DTOR IN");
232 Release();
233 }
234
Release()235 void RtpEncoderTs::Release()
236 {
237 SHARING_LOGD("trace.");
238 {
239 std::lock_guard<std::mutex> lock(cbLockMutex_);
240 onRtpPack_ = nullptr;
241 }
242 exit_ = true;
243 if (encodeThread_ && encodeThread_->joinable()) {
244 encodeThread_->join();
245 encodeThread_ = nullptr;
246 }
247
248 {
249 std::lock_guard<std::mutex> lock(queueMutex_);
250 while (!dataQueue_.empty()) {
251 dataQueue_.pop();
252 }
253 }
254
255 if (avioContext_) {
256 avio_context_free(&avioContext_);
257 }
258
259 if (avioCtxBuffer_) {
260 av_free(avioCtxBuffer_);
261 avioCtxBuffer_ = nullptr;
262 }
263
264 if (avFormatContext_) {
265 avformat_free_context(avFormatContext_);
266 avFormatContext_ = nullptr;
267 }
268 }
269
InputFrame(const Frame::Ptr & frame)270 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
271 {
272 RETURN_IF_NULL(frame);
273 if (exit_) {
274 return;
275 }
276
277 DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
278 switch (frame->GetCodecId()) {
279 case CODEC_H264:
280 // merge sps, pps and key frame into one packet
281 merger_.InputFrame(
282 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
283 auto outFrame =
284 std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
285 PrefixSize((char *)buffer->Data(), buffer->Size()));
286 SaveFrame(outFrame);
287 });
288 break;
289 case CODEC_AAC:
290 case CODEC_PCM:
291 SaveFrame(frame);
292 break;
293 default:
294 SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
295 break;
296 }
297
298 if (audioCodeId_ == AV_CODEC_ID_NONE) {
299 if (frame->GetCodecId() == CODEC_AAC) {
300 audioCodeId_ = AV_CODEC_ID_AAC;
301 } else if (frame->GetCodecId() == CODEC_PCM) {
302 audioCodeId_ = AV_CODEC_ID_PCM_S16BE;
303 }
304 }
305 if (encodeThread_ == nullptr && audioCodeId_ != AV_CODEC_ID_NONE) {
306 encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
307 }
308 }
309
SetOnRtpPack(const OnRtpPack & cb)310 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
311 {
312 SHARING_LOGD("trace.");
313 onRtpPack_ = cb;
314 }
315
StartEncoding()316 void RtpEncoderTs::StartEncoding()
317 {
318 SHARING_LOGD("trace.");
319 avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
320 if (avFormatContext_ == nullptr) {
321 SHARING_LOGE("avformat_alloc_output_context2 failed.");
322 return;
323 }
324
325 videoStream = avformat_new_stream(avFormatContext_, NULL);
326 videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
327 videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
328 videoStream->codecpar->codec_tag = 0;
329 videoStream->time_base.num = 1;
330 videoStream->time_base.den = SAMPLE_RATE_90K; // 90000: video sample rate
331
332 audioStream = avformat_new_stream(avFormatContext_, NULL);
333 audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
334 audioStream->codecpar->codec_id = audioCodeId_;
335 audioStream->codecpar->codec_tag = 0;
336 audioStream->codecpar->channel_layout = (uint64_t)av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
337 audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
338 audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
339 audioStream->time_base.num = 1;
340 audioStream->time_base.den = AUDIO_SAMPLE_RATE_48000;
341 SHARING_LOGI("audio stream id: %{public}d, video stream id: %{public}d, audio codecid: %{public}d.",
342 audioStream->index, videoStream->index, audioCodeId_);
343
344 avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
345 avioContext_ =
346 avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
347 if (avioContext_ == nullptr) {
348 SHARING_LOGE("avio_alloc_context failed.");
349 return;
350 }
351 avFormatContext_->pb = avioContext_;
352 avFormatContext_->flags = (int32_t)((uint32_t)(avFormatContext_->flags) | AVFMT_FLAG_CUSTOM_IO);
353
354 int32_t ret = avformat_write_header(avFormatContext_, NULL);
355 if (ret < 0) {
356 SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
357 return;
358 }
359
360 while (!exit_) {
361 AVPacket *packet = av_packet_alloc();
362 auto frame = ReadFrame(packet);
363 if (frame == nullptr) {
364 break;
365 }
366 av_write_frame(avFormatContext_, packet);
367 }
368
369 av_write_trailer(avFormatContext_);
370 }
371
SaveFrame(Frame::Ptr frame)372 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
373 {
374 RETURN_IF_NULL(frame);
375 std::lock_guard<std::mutex> lock(queueMutex_);
376 dataQueue_.emplace(frame);
377 }
378
ReadFrame(AVPacket * packet)379 Frame::Ptr RtpEncoderTs::ReadFrame(AVPacket *packet)
380 {
381 if (packet == nullptr) {
382 SHARING_LOGE("packet is null!");
383 return nullptr;
384 }
385 while (dataQueue_.empty()) {
386 if (exit_ == true) {
387 SHARING_LOGI("exit when read frame.");
388 return nullptr;
389 }
390 std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait time
391 }
392
393 std::unique_lock<std::mutex> lock(queueMutex_);
394 Frame::Ptr frame = dataQueue_.front();
395 dataQueue_.pop();
396 packet->data = frame->Data();
397 packet->size = frame->Size();
398
399 keyFrame_ = frame->KeyFrame();
400 if (frame->GetTrackType() == TRACK_VIDEO) {
401 packet->dts = av_rescale(frame->Dts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
402 packet->pts = av_rescale(frame->Pts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
403 packet->stream_index = videoStream->index;
404 timeStamp_ = frame->Dts();
405 } else if (frame->GetTrackType() == TRACK_AUDIO) {
406 packet->dts = av_rescale(frame->Dts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
407 packet->pts = av_rescale(frame->Pts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
408 packet->stream_index = audioStream->index;
409 timeStamp_ = frame->Dts();
410 }
411 return frame;
412 }
413
RemoveFrameAfterMuxing()414 void RtpEncoderTs::RemoveFrameAfterMuxing()
415 {
416 std::unique_lock<std::mutex> lock(queueMutex_);
417 dataQueue_.pop();
418 }
419
WritePacket(void * opaque,uint8_t * buf,int buf_size)420 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
421 {
422 RETURN_INVALID_IF_NULL(opaque);
423 RETURN_INVALID_IF_NULL(buf);
424
425 RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
426 std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
427 if (encoder->onRtpPack_) {
428 auto rtp =
429 encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
430 encoder->onRtpPack_(rtp);
431 }
432
433 return 0;
434 }
435 } // namespace Sharing
436 } // namespace OHOS
437