1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rtp_codec_ts.h"
17 #include <securec.h>
18 #include "common/common_macro.h"
19 #include "common/media_log.h"
20 #include "frame/aac_frame.h"
21 #include "frame/h264_frame.h"
22
23 namespace OHOS {
24 namespace Sharing {
25 constexpr int32_t FF_BUFFER_SIZE = 1500;
26 static std::mutex frameLock;
27
RtpDecoderTs()28 RtpDecoderTs::RtpDecoderTs()
29 {
30 MEDIA_LOGD("RtpDecoderTs CTOR IN.");
31 }
32
~RtpDecoderTs()33 RtpDecoderTs::~RtpDecoderTs()
34 {
35 SHARING_LOGI("RtpDecoderTs DTOR IN.");
36 Release();
37 }
38
Release()39 void RtpDecoderTs::Release()
40 {
41 {
42 std::lock_guard<std::mutex> lock(frameLock);
43 onFrame_ = nullptr;
44 }
45
46 exit_ = true;
47 if (decodeThread_ && decodeThread_->joinable()) {
48 decodeThread_->join();
49 decodeThread_ = nullptr;
50 }
51
52 if (avFormatContext_) {
53 avformat_close_input(&avFormatContext_);
54 }
55
56 if (avioContext_) {
57 av_freep(&avioContext_->buffer);
58 avio_context_free(&avioContext_);
59 }
60 }
61
InputRtp(const RtpPacket::Ptr & rtp)62 void RtpDecoderTs::InputRtp(const RtpPacket::Ptr &rtp)
63 {
64 MEDIA_LOGD("trace.");
65 RETURN_IF_NULL(rtp);
66 if (exit_) {
67 return;
68 }
69
70 if (decodeThread_ == nullptr) {
71 decodeThread_ = std::make_unique<std::thread>(&RtpDecoderTs::StartDecoding, this);
72 }
73
74 auto payload_size = rtp->GetPayloadSize();
75 if (payload_size <= 0) {
76 return;
77 }
78
79 std::lock_guard<std::mutex> lock(queueMutex_);
80 dataQueue_.emplace(rtp);
81 }
82
SetOnFrame(const OnFrame & cb)83 void RtpDecoderTs::SetOnFrame(const OnFrame &cb)
84 {
85 onFrame_ = cb;
86 }
87
StartDecoding()88 void RtpDecoderTs::StartDecoding()
89 {
90 SHARING_LOGE("trace.");
91 avFormatContext_ = avformat_alloc_context();
92 if (avFormatContext_ == nullptr) {
93 SHARING_LOGE("avformat_alloc_context failed.");
94 return;
95 }
96
97 auto buffer = (uint8_t *)av_malloc(FF_BUFFER_SIZE);
98 avioContext_ =
99 avio_alloc_context(buffer, FF_BUFFER_SIZE, 0, this, &RtpDecoderTs::StaticReadPacket, nullptr, nullptr);
100 if (avioContext_ == nullptr) {
101 SHARING_LOGE("avio_alloc_context failed.");
102 return;
103 }
104 avFormatContext_->pb = avioContext_;
105
106 int ret = avformat_open_input(&avFormatContext_, nullptr, nullptr, nullptr);
107 if (ret != 0) {
108 SHARING_LOGE("avformat_open_input failed.");
109 return;
110 }
111
112 for (int32_t i = 0; i < static_cast<int32_t>(avFormatContext_->nb_streams); i++) {
113 if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
114 videoStreamIndex_ = i;
115 SHARING_LOGD("find video stream %{public}u.", i);
116 } else if (avFormatContext_->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
117 audioStreamIndex_ = i;
118 SHARING_LOGD("find audio stream %{public}u.", i);
119 }
120 }
121
122 AVPacket *packet = av_packet_alloc();
123 while (!exit_ && av_read_frame(avFormatContext_, packet) >= 0) {
124 if (exit_) {
125 SHARING_LOGI("ignore av read frame.");
126 break;
127 }
128 if (packet->stream_index == videoStreamIndex_) {
129 SplitH264((char *)packet->data, (size_t)packet->size, 0, [&](const char *buf, size_t len, size_t prefix) {
130 if (H264_TYPE(buf[prefix]) == H264Frame::NAL_AUD) {
131 return;
132 }
133 auto outFrame = std::make_shared<H264Frame>((uint8_t *)buf, len, (uint32_t)packet->dts,
134 (uint32_t)packet->pts, prefix);
135 std::lock_guard<std::mutex> lock(frameLock);
136 if (onFrame_) {
137 onFrame_(outFrame);
138 }
139 });
140 } else if (packet->stream_index == audioStreamIndex_) {
141 auto outFrame = std::make_shared<AACFrame>((uint8_t *)packet->data, packet->size, (uint32_t)packet->dts,
142 (uint32_t)packet->pts);
143 std::lock_guard<std::mutex> lock(frameLock);
144 if (onFrame_) {
145 onFrame_(outFrame);
146 }
147 }
148 av_packet_unref(packet);
149 }
150
151 av_packet_free(&packet);
152 SHARING_LOGD("ts decoding Thread_ exit.");
153 }
154
StaticReadPacket(void * opaque,uint8_t * buf,int buf_size)155 int RtpDecoderTs::StaticReadPacket(void *opaque, uint8_t *buf, int buf_size)
156 {
157 RETURN_INVALID_IF_NULL(opaque);
158 RETURN_INVALID_IF_NULL(buf);
159 RtpDecoderTs *decoder = (RtpDecoderTs *)opaque;
160 if (decoder == nullptr) {
161 SHARING_LOGE("decoder is nullptr.");
162 return 0;
163 }
164 return decoder->ReadPacket(buf, buf_size);
165 }
166
ReadPacket(uint8_t * buf,int buf_size)167 int RtpDecoderTs::ReadPacket(uint8_t *buf, int buf_size)
168 {
169 RETURN_INVALID_IF_NULL(buf);
170 while (dataQueue_.empty()) {
171 if (exit_ == true) {
172 SHARING_LOGI("read packet exit.");
173 return 0;
174 }
175 std::this_thread::sleep_for(std::chrono::microseconds(5)); // 5: wait times
176 }
177
178 std::unique_lock<std::mutex> lock(queueMutex_);
179 auto &rtp = dataQueue_.front();
180 auto data = rtp->GetPayload();
181 int length = static_cast<int>(rtp->GetPayloadSize());
182 if (length > buf_size) {
183 SHARING_LOGE("rtp length exceed buf_size!");
184 return 0;
185 }
186 auto ret = memcpy_s(buf, length, data, length);
187 if (ret != EOK) {
188 return 0;
189 }
190
191 dataQueue_.pop();
192 return length;
193 }
194
RtpEncoderTs(uint32_t ssrc,uint32_t mtuSize,uint32_t sampleRate,uint8_t payloadType,uint16_t seq)195 RtpEncoderTs::RtpEncoderTs(uint32_t ssrc, uint32_t mtuSize, uint32_t sampleRate, uint8_t payloadType, uint16_t seq)
196 : RtpMaker(ssrc, mtuSize, payloadType, sampleRate, seq)
197 {
198 SHARING_LOGI("RtpEncoderTs CTOR IN");
199 merger_.SetType(FrameMerger::H264_PREFIX);
200 }
201
~RtpEncoderTs()202 RtpEncoderTs::~RtpEncoderTs()
203 {
204 SHARING_LOGI("RtpEncoderTs DTOR IN");
205 Release();
206 }
207
Release()208 void RtpEncoderTs::Release()
209 {
210 SHARING_LOGD("trace.");
211 {
212 std::lock_guard<std::mutex> lock(cbLockMutex_);
213 onRtpPack_ = nullptr;
214 }
215 exit_ = true;
216 if (encodeThread_ && encodeThread_->joinable()) {
217 encodeThread_->join();
218 encodeThread_ = nullptr;
219 }
220
221 {
222 std::lock_guard<std::mutex> lock(queueMutex_);
223 while (!dataQueue_.empty()) {
224 dataQueue_.pop();
225 }
226 }
227
228 if (avioContext_) {
229 avio_context_free(&avioContext_);
230 }
231
232 if (avioCtxBuffer_) {
233 av_free(avioCtxBuffer_);
234 avioCtxBuffer_ = nullptr;
235 }
236
237 if (avFormatContext_) {
238 avformat_free_context(avFormatContext_);
239 avFormatContext_ = nullptr;
240 }
241 }
242
InputFrame(const Frame::Ptr & frame)243 void RtpEncoderTs::InputFrame(const Frame::Ptr &frame)
244 {
245 RETURN_IF_NULL(frame);
246 if (exit_) {
247 return;
248 }
249
250 DataBuffer::Ptr buffer = std::make_shared<DataBuffer>();
251 switch (frame->GetCodecId()) {
252 case CODEC_H264:
253 // merge sps, pps and key frame into one packet
254 merger_.InputFrame(
255 frame, buffer, [this](uint32_t dts, uint32_t pts, const DataBuffer::Ptr &buffer, bool have_key_frame) {
256 auto outFrame =
257 std::make_shared<H264Frame>(buffer->Data(), buffer->Size(), (uint32_t)dts, (uint32_t)pts,
258 PrefixSize((char *)buffer->Data(), buffer->Size()));
259 SaveFrame(outFrame);
260 });
261 break;
262 case CODEC_AAC:
263 case CODEC_PCM:
264 SaveFrame(frame);
265 break;
266 default:
267 SHARING_LOGW("Unknown codec: %d", frame->GetCodecId());
268 break;
269 }
270
271 if (audioCodeId_ == AV_CODEC_ID_NONE) {
272 if (frame->GetCodecId() == CODEC_AAC) {
273 audioCodeId_ = AV_CODEC_ID_AAC;
274 } else if (frame->GetCodecId() == CODEC_PCM) {
275 audioCodeId_ = AV_CODEC_ID_PCM_S16BE;
276 }
277 }
278 if (encodeThread_ == nullptr && audioCodeId_ != AV_CODEC_ID_NONE) {
279 encodeThread_ = std::make_unique<std::thread>(&RtpEncoderTs::StartEncoding, this);
280 }
281 }
282
SetOnRtpPack(const OnRtpPack & cb)283 void RtpEncoderTs::SetOnRtpPack(const OnRtpPack &cb)
284 {
285 SHARING_LOGD("trace.");
286 onRtpPack_ = cb;
287 }
288
StartEncoding()289 void RtpEncoderTs::StartEncoding()
290 {
291 SHARING_LOGD("trace.");
292 avformat_alloc_output_context2(&avFormatContext_, NULL, "mpegts", NULL);
293 if (avFormatContext_ == nullptr) {
294 SHARING_LOGE("avformat_alloc_output_context2 failed.");
295 return;
296 }
297
298 videoStream = avformat_new_stream(avFormatContext_, NULL);
299 videoStream->codecpar->codec_type = AVMEDIA_TYPE_VIDEO;
300 videoStream->codecpar->codec_id = AV_CODEC_ID_H264;
301 videoStream->codecpar->codec_tag = 0;
302 videoStream->time_base.num = 1;
303 videoStream->time_base.den = SAMPLE_RATE_90K; // 90000: video sample rate
304
305 audioStream = avformat_new_stream(avFormatContext_, NULL);
306 audioStream->codecpar->codec_type = AVMEDIA_TYPE_AUDIO;
307 audioStream->codecpar->codec_id = audioCodeId_;
308 audioStream->codecpar->codec_tag = 0;
309 audioStream->codecpar->channel_layout = (uint64_t)av_get_default_channel_layout(AUDIO_CHANNEL_STEREO);
310 audioStream->codecpar->channels = AUDIO_CHANNEL_STEREO;
311 audioStream->codecpar->sample_rate = AUDIO_SAMPLE_RATE_48000;
312 audioStream->time_base.num = 1;
313 audioStream->time_base.den = AUDIO_SAMPLE_RATE_48000;
314 SHARING_LOGI("audio stream id: %{public}d, video stream id: %{public}d, audio codecid: %{public}d.",
315 audioStream->index, videoStream->index, audioCodeId_);
316
317 avioCtxBuffer_ = (uint8_t *)av_malloc(MAX_RTP_PAYLOAD_SIZE);
318 avioContext_ =
319 avio_alloc_context(avioCtxBuffer_, MAX_RTP_PAYLOAD_SIZE, 1, this, NULL, &RtpEncoderTs::WritePacket, NULL);
320 if (avioContext_ == nullptr) {
321 SHARING_LOGE("avio_alloc_context failed.");
322 return;
323 }
324 avFormatContext_->pb = avioContext_;
325 avFormatContext_->flags = (int32_t)((uint32_t)(avFormatContext_->flags) | AVFMT_FLAG_CUSTOM_IO);
326
327 int32_t ret = avformat_write_header(avFormatContext_, NULL);
328 if (ret < 0) {
329 SHARING_LOGE("avformat_write_header to output failed, ret: %{public}d.", ret);
330 return;
331 }
332
333 while (!exit_) {
334 AVPacket *packet = av_packet_alloc();
335 auto frame = ReadFrame(packet);
336 if (frame == nullptr) {
337 break;
338 }
339 av_write_frame(avFormatContext_, packet);
340 }
341
342 av_write_trailer(avFormatContext_);
343 }
344
SaveFrame(Frame::Ptr frame)345 void RtpEncoderTs::SaveFrame(Frame::Ptr frame)
346 {
347 RETURN_IF_NULL(frame);
348 std::lock_guard<std::mutex> lock(queueMutex_);
349 dataQueue_.emplace(frame);
350 }
351
ReadFrame(AVPacket * packet)352 Frame::Ptr RtpEncoderTs::ReadFrame(AVPacket *packet)
353 {
354 if (packet == nullptr) {
355 SHARING_LOGE("packet is null!");
356 return nullptr;
357 }
358 while (dataQueue_.empty()) {
359 if (exit_ == true) {
360 SHARING_LOGI("exit when read frame.");
361 return nullptr;
362 }
363 std::this_thread::sleep_for(std::chrono::microseconds(100)); // 100: wait time
364 }
365
366 std::unique_lock<std::mutex> lock(queueMutex_);
367 Frame::Ptr frame = dataQueue_.front();
368 dataQueue_.pop();
369 packet->data = frame->Data();
370 packet->size = frame->Size();
371
372 keyFrame_ = frame->KeyFrame();
373 if (frame->GetTrackType() == TRACK_VIDEO) {
374 packet->dts = av_rescale(frame->Dts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
375 packet->pts = av_rescale(frame->Pts(), videoStream->time_base.den, WFD_MSEC_IN_SEC);
376 packet->stream_index = videoStream->index;
377 timeStamp_ = frame->Dts();
378 } else if (frame->GetTrackType() == TRACK_AUDIO) {
379 packet->dts = av_rescale(frame->Dts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
380 packet->pts = av_rescale(frame->Pts(), audioStream->time_base.den, WFD_MSEC_IN_SEC);
381 packet->stream_index = audioStream->index;
382 timeStamp_ = frame->Dts();
383 }
384 return frame;
385 }
386
RemoveFrameAfterMuxing()387 void RtpEncoderTs::RemoveFrameAfterMuxing()
388 {
389 std::unique_lock<std::mutex> lock(queueMutex_);
390 dataQueue_.pop();
391 }
392
WritePacket(void * opaque,uint8_t * buf,int buf_size)393 int RtpEncoderTs::WritePacket(void *opaque, uint8_t *buf, int buf_size)
394 {
395 RETURN_INVALID_IF_NULL(opaque);
396 RETURN_INVALID_IF_NULL(buf);
397
398 RtpEncoderTs *encoder = (RtpEncoderTs *)opaque;
399 std::lock_guard<std::mutex> lock(encoder->cbLockMutex_);
400 if (encoder->onRtpPack_) {
401 auto rtp =
402 encoder->MakeRtp(reinterpret_cast<const void *>(buf), buf_size, encoder->keyFrame_, encoder->timeStamp_);
403 encoder->onRtpPack_(rtp);
404 }
405
406 return 0;
407 }
408 } // namespace Sharing
409 } // namespace OHOS
410