1 /*
2 * Copyright 2006, 2007, 2008, 2009, 2010 Fluendo S.A.
3 * Authors: Jan Schmidt <jan@fluendo.com>
4 * Kapil Agrawal <kapil@fluendo.com>
5 * Julien Moutte <julien@fluendo.com>
6 *
7 * Copyright (C) 2011 Jan Schmidt <thaytan@noraisin.net>
8 *
9 * This library is licensed under 3 different licenses and you
10 * can choose to use it under the terms of any one of them. The
11 * three licenses are the MPL 1.1, the LGPL and the MIT license.
12 *
13 * MPL:
14 *
15 * The contents of this file are subject to the Mozilla Public License
16 * Version 1.1 (the "License"); you may not use this file except in
17 * compliance with the License. You may obtain a copy of the License at
18 * http://www.mozilla.org/MPL/.
19 *
20 * Software distributed under the License is distributed on an "AS IS"
21 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
22 * License for the specific language governing rights and limitations
23 * under the License.
24 *
25 * LGPL:
26 *
27 * This library is free software; you can redistribute it and/or
28 * modify it under the terms of the GNU Library General Public
29 * License as published by the Free Software Foundation; either
30 * version 2 of the License, or (at your option) any later version.
31 *
32 * This library is distributed in the hope that it will be useful,
33 * but WITHOUT ANY WARRANTY; without even the implied warranty of
34 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
35 * Library General Public License for more details.
36 *
37 * You should have received a copy of the GNU Library General Public
38 * License along with this library; if not, write to the
39 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
40 * Boston, MA 02110-1301, USA.
41 *
42 * MIT:
43 *
44 * Unless otherwise indicated, Source Code is licensed under MIT license.
45 * See further explanation attached in License Statement (distributed in the file
46 * LICENSE).
47 *
48 * Permission is hereby granted, free of charge, to any person obtaining a copy of
49 * this software and associated documentation files (the "Software"), to deal in
50 * the Software without restriction, including without limitation the rights to
51 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
52 * of the Software, and to permit persons to whom the Software is furnished to do
53 * so, subject to the following conditions:
54 *
55 * The above copyright notice and this permission notice shall be included in all
56 * copies or substantial portions of the Software.
57 *
58 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
59 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
60 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
61 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
62 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
63 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
64 * SOFTWARE.
65 *
66 * SPDX-License-Identifier: MPL-1.1 OR MIT OR LGPL-2.0-or-later
67 */
68 #ifdef HAVE_CONFIG_H
69 #include "config.h"
70 #endif
71
72 #include <stdio.h>
73 #include <string.h>
74
75 #include <gst/tag/tag.h>
76 #include <gst/video/video.h>
77 #include <gst/mpegts/mpegts.h>
78 #include <gst/pbutils/pbutils.h>
79 #include <gst/videoparsers/gstjpeg2000parse.h>
80 #include <gst/video/video-color.h>
81
82 #include "gstbasetsmux.h"
83 #include "gstbasetsmuxaac.h"
84 #include "gstbasetsmuxttxt.h"
85 #include "gstbasetsmuxopus.h"
86 #include "gstbasetsmuxjpeg2000.h"
87
88 GST_DEBUG_CATEGORY (gst_base_ts_mux_debug);
89 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
90
91 /* GstBaseTsMuxPad */
92
93 G_DEFINE_TYPE (GstBaseTsMuxPad, gst_base_ts_mux_pad, GST_TYPE_AGGREGATOR_PAD);
94
95 /* Internals */
96
97 static void
gst_base_ts_mux_pad_reset(GstBaseTsMuxPad * pad)98 gst_base_ts_mux_pad_reset (GstBaseTsMuxPad * pad)
99 {
100 pad->dts = GST_CLOCK_STIME_NONE;
101 pad->prog_id = -1;
102
103 if (pad->free_func)
104 pad->free_func (pad->prepare_data);
105 pad->prepare_data = NULL;
106 pad->prepare_func = NULL;
107 pad->free_func = NULL;
108
109 if (pad->codec_data)
110 gst_buffer_replace (&pad->codec_data, NULL);
111
112 /* reference owned elsewhere */
113 pad->stream = NULL;
114 pad->prog = NULL;
115
116 if (pad->language) {
117 g_free (pad->language);
118 pad->language = NULL;
119 }
120 }
121
122 /* GstAggregatorPad implementation */
123
124 static GstFlowReturn
gst_base_ts_mux_pad_flush(GstAggregatorPad * agg_pad,GstAggregator * agg)125 gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
126 {
127 GList *cur;
128 GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
129
130 /* Send initial segments again after a flush-stop, and also resend the
131 * header sections */
132 g_mutex_lock (&mux->lock);
133 mux->first = TRUE;
134
135 /* output PAT, SI tables */
136 tsmux_resend_pat (mux->tsmux);
137 tsmux_resend_si (mux->tsmux);
138
139 /* output PMT for each program */
140 for (cur = mux->tsmux->programs; cur; cur = cur->next) {
141 TsMuxProgram *program = (TsMuxProgram *) cur->data;
142
143 tsmux_resend_pmt (program);
144 }
145 g_mutex_unlock (&mux->lock);
146
147 return GST_FLOW_OK;
148 }
149
150 /* GObject implementation */
151
152 static void
gst_base_ts_mux_pad_dispose(GObject * obj)153 gst_base_ts_mux_pad_dispose (GObject * obj)
154 {
155 GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (obj);
156
157 gst_base_ts_mux_pad_reset (ts_pad);
158
159 G_OBJECT_CLASS (gst_base_ts_mux_pad_parent_class)->dispose (obj);
160 }
161
162 static void
gst_base_ts_mux_pad_class_init(GstBaseTsMuxPadClass * klass)163 gst_base_ts_mux_pad_class_init (GstBaseTsMuxPadClass * klass)
164 {
165 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
166 GstAggregatorPadClass *gstaggpad_class = GST_AGGREGATOR_PAD_CLASS (klass);
167
168 gobject_class->dispose = gst_base_ts_mux_pad_dispose;
169 gstaggpad_class->flush = gst_base_ts_mux_pad_flush;
170
171 gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX, 0);
172 }
173
174 static void
gst_base_ts_mux_pad_init(GstBaseTsMuxPad * vaggpad)175 gst_base_ts_mux_pad_init (GstBaseTsMuxPad * vaggpad)
176 {
177 }
178
179 /* GstBaseTsMux */
180
181 enum
182 {
183 PROP_0,
184 PROP_PROG_MAP,
185 PROP_PAT_INTERVAL,
186 PROP_PMT_INTERVAL,
187 PROP_ALIGNMENT,
188 PROP_SI_INTERVAL,
189 PROP_BITRATE,
190 PROP_PCR_INTERVAL,
191 PROP_SCTE_35_PID,
192 PROP_SCTE_35_NULL_INTERVAL
193 };
194
195 #define DEFAULT_SCTE_35_PID 0
196
197 #define BASETSMUX_DEFAULT_ALIGNMENT -1
198
199 #define CLOCK_BASE 9LL
200 #define CLOCK_FREQ (CLOCK_BASE * 10000) /* 90 kHz PTS clock */
201 #define CLOCK_FREQ_SCR (CLOCK_FREQ * 300) /* 27 MHz SCR clock */
202 #define TS_MUX_CLOCK_BASE (TSMUX_CLOCK_FREQ * 10 * 360)
203
204 #define GSTTIME_TO_MPEGTIME(time) \
205 (((time) > 0 ? (gint64) 1 : (gint64) -1) * \
206 (gint64) gst_util_uint64_scale (ABS(time), CLOCK_BASE, GST_MSECOND/10))
207 /* 27 MHz SCR conversions: */
208 #define MPEG_SYS_TIME_TO_GSTTIME(time) (gst_util_uint64_scale ((time), \
209 GST_USECOND, CLOCK_FREQ_SCR / 1000000))
210 #define GSTTIME_TO_MPEG_SYS_TIME(time) (gst_util_uint64_scale ((time), \
211 CLOCK_FREQ_SCR / 1000000, GST_USECOND))
212
213 #define DEFAULT_PROG_ID 0
214
215 static GstStaticPadTemplate gst_base_ts_mux_src_factory =
216 GST_STATIC_PAD_TEMPLATE ("src",
217 GST_PAD_SRC,
218 GST_PAD_ALWAYS,
219 GST_STATIC_CAPS ("video/mpegts, "
220 "systemstream = (boolean) true, " "packetsize = (int) { 188, 192} ")
221 );
222
223 typedef struct
224 {
225 GstMapInfo map_info;
226 GstBuffer *buffer;
227 } StreamData;
228
229 G_DEFINE_TYPE_WITH_CODE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_AGGREGATOR,
230 gst_mpegts_initialize ());
231
232 /* Internals */
233
234 /* Takes over the ref on the buffer */
235 static StreamData *
stream_data_new(GstBuffer * buffer)236 stream_data_new (GstBuffer * buffer)
237 {
238 StreamData *res = g_new (StreamData, 1);
239 res->buffer = buffer;
240 gst_buffer_map (buffer, &(res->map_info), GST_MAP_READ);
241
242 return res;
243 }
244
245 static void
stream_data_free(StreamData * data)246 stream_data_free (StreamData * data)
247 {
248 if (data) {
249 gst_buffer_unmap (data->buffer, &data->map_info);
250 gst_buffer_unref (data->buffer);
251 g_free (data);
252 }
253 }
254
255 #define parent_class gst_base_ts_mux_parent_class
256
257 static void
gst_base_ts_mux_set_header_on_caps(GstBaseTsMux * mux)258 gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
259 {
260 GstBuffer *buf;
261 GstStructure *structure;
262 GValue array = { 0 };
263 GValue value = { 0 };
264 GstCaps *caps;
265
266 caps = gst_pad_get_pad_template_caps (GST_AGGREGATOR_SRC_PAD (mux));
267
268 caps = gst_caps_make_writable (caps);
269 structure = gst_caps_get_structure (caps, 0);
270
271 gst_structure_set (structure, "packetsize", G_TYPE_INT, mux->packet_size,
272 NULL);
273
274 g_value_init (&array, GST_TYPE_ARRAY);
275
276 GST_LOG_OBJECT (mux, "setting %u packets into streamheader",
277 g_queue_get_length (&mux->streamheader));
278
279 while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader)))) {
280 g_value_init (&value, GST_TYPE_BUFFER);
281 gst_value_take_buffer (&value, buf);
282 gst_value_array_append_value (&array, &value);
283 g_value_unset (&value);
284 }
285
286 gst_structure_set_value (structure, "streamheader", &array);
287 gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
288 g_value_unset (&array);
289 gst_caps_unref (caps);
290 }
291
292 static gboolean
steal_si_section(GstMpegtsSectionType * type,TsMuxSection * section,TsMux * mux)293 steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section,
294 TsMux * mux)
295 {
296 g_hash_table_insert (mux->si_sections, type, section);
297
298 return TRUE;
299 }
300
301 /* Must be called with mux->lock held */
302 static void
gst_base_ts_mux_reset(GstBaseTsMux * mux,gboolean alloc)303 gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
304 {
305 GstBuffer *buf;
306 GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
307 GHashTable *si_sections = NULL;
308 GList *l;
309
310 mux->first = TRUE;
311 mux->last_flow_ret = GST_FLOW_OK;
312 mux->last_ts = GST_CLOCK_TIME_NONE;
313 mux->is_delta = TRUE;
314 mux->is_header = FALSE;
315
316 mux->streamheader_sent = FALSE;
317 mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
318 gst_event_replace (&mux->force_key_unit_event, NULL);
319
320 if (mux->out_adapter)
321 gst_adapter_clear (mux->out_adapter);
322 mux->output_ts_offset = GST_CLOCK_STIME_NONE;
323
324 if (mux->tsmux) {
325 if (mux->tsmux->si_sections)
326 si_sections = g_hash_table_ref (mux->tsmux->si_sections);
327
328 tsmux_free (mux->tsmux);
329 mux->tsmux = NULL;
330 }
331
332 if (mux->programs) {
333 g_hash_table_destroy (mux->programs);
334 }
335 mux->programs = g_hash_table_new (g_direct_hash, g_direct_equal);
336
337 while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader))))
338 gst_buffer_unref (buf);
339
340 gst_event_replace (&mux->force_key_unit_event, NULL);
341 gst_buffer_replace (&mux->out_buffer, NULL);
342
343 GST_OBJECT_LOCK (mux);
344
345 for (l = GST_ELEMENT (mux)->sinkpads; l; l = l->next) {
346 gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (l->data));
347 }
348
349 GST_OBJECT_UNLOCK (mux);
350
351 if (alloc) {
352 g_assert (klass->create_ts_mux);
353
354 mux->tsmux = klass->create_ts_mux (mux);
355
356 /* Preserve user-specified sections across resets */
357 if (si_sections)
358 g_hash_table_foreach_steal (si_sections, (GHRFunc) steal_si_section,
359 mux->tsmux);
360 }
361
362 if (si_sections)
363 g_hash_table_unref (si_sections);
364
365 mux->last_scte35_event_seqnum = GST_SEQNUM_INVALID;
366
367 if (klass->reset)
368 klass->reset (mux);
369 }
370
371 static void
release_buffer_cb(guint8 * data,void * user_data)372 release_buffer_cb (guint8 * data, void *user_data)
373 {
374 stream_data_free ((StreamData *) user_data);
375 }
376
377 /* Must be called with mux->lock held */
378 static GstFlowReturn
gst_base_ts_mux_create_or_update_stream(GstBaseTsMux * mux,GstBaseTsMuxPad * ts_pad,GstCaps * caps)379 gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux,
380 GstBaseTsMuxPad * ts_pad, GstCaps * caps)
381 {
382 GstStructure *s;
383 guint st = TSMUX_ST_RESERVED;
384 const gchar *mt;
385 const GValue *value = NULL;
386 GstBuffer *codec_data = NULL;
387 guint8 opus_channel_config_code = 0;
388 guint16 profile = GST_JPEG2000_PARSE_PROFILE_NONE;
389 guint8 main_level = 0;
390 guint32 max_rate = 0;
391 guint8 color_spec = 0;
392 const gchar *stream_format = NULL;
393 const char *interlace_mode = NULL;
394 gchar *pmt_name;
395
396 GST_DEBUG_OBJECT (ts_pad,
397 "%s stream with PID 0x%04x for caps %" GST_PTR_FORMAT,
398 ts_pad->stream ? "Recreating" : "Creating", ts_pad->pid, caps);
399
400 s = gst_caps_get_structure (caps, 0);
401
402 mt = gst_structure_get_name (s);
403 value = gst_structure_get_value (s, "codec_data");
404 if (value != NULL)
405 codec_data = gst_value_get_buffer (value);
406
407 g_clear_pointer (&ts_pad->codec_data, gst_buffer_unref);
408 ts_pad->prepare_func = NULL;
409
410 stream_format = gst_structure_get_string (s, "stream-format");
411
412 if (strcmp (mt, "video/x-dirac") == 0) {
413 st = TSMUX_ST_VIDEO_DIRAC;
414 } else if (strcmp (mt, "audio/x-ac3") == 0) {
415 st = TSMUX_ST_PS_AUDIO_AC3;
416 } else if (strcmp (mt, "audio/x-dts") == 0) {
417 st = TSMUX_ST_PS_AUDIO_DTS;
418 } else if (strcmp (mt, "audio/x-lpcm") == 0) {
419 st = TSMUX_ST_PS_AUDIO_LPCM;
420 } else if (strcmp (mt, "video/x-h264") == 0) {
421 st = TSMUX_ST_VIDEO_H264;
422 } else if (strcmp (mt, "video/x-h265") == 0) {
423 st = TSMUX_ST_VIDEO_HEVC;
424 } else if (strcmp (mt, "audio/mpeg") == 0) {
425 gint mpegversion;
426
427 if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
428 GST_ERROR_OBJECT (ts_pad, "caps missing mpegversion");
429 goto not_negotiated;
430 }
431
432 switch (mpegversion) {
433 case 1:{
434 int mpegaudioversion = 1; /* Assume mpegaudioversion=1 for backwards compatibility */
435 (void) gst_structure_get_int (s, "mpegaudioversion", &mpegaudioversion);
436
437 if (mpegaudioversion == 1)
438 st = TSMUX_ST_AUDIO_MPEG1;
439 else
440 st = TSMUX_ST_AUDIO_MPEG2;
441 break;
442 }
443 case 2:{
444 /* mpegversion=2 in GStreamer refers to MPEG-2 Part 7 audio, */
445
446 st = TSMUX_ST_AUDIO_AAC;
447
448 /* Check the stream format. If raw, make dummy internal codec data from the caps */
449 if (g_strcmp0 (stream_format, "raw") == 0) {
450 ts_pad->codec_data =
451 gst_base_ts_mux_aac_mpeg2_make_codec_data (mux, caps);
452 ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg2;
453 if (ts_pad->codec_data == NULL) {
454 GST_ERROR_OBJECT (mux, "Invalid or incomplete caps for MPEG-2 AAC");
455 goto not_negotiated;
456 }
457 }
458 break;
459 }
460 case 4:
461 {
462 st = TSMUX_ST_AUDIO_AAC;
463
464 /* Check the stream format. We need codec_data with RAW streams and mpegversion=4 */
465 if (g_strcmp0 (stream_format, "raw") == 0) {
466 if (codec_data) {
467 GST_DEBUG_OBJECT (ts_pad,
468 "we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
469 gst_buffer_get_size (codec_data));
470 ts_pad->codec_data = gst_buffer_ref (codec_data);
471 ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg4;
472 } else {
473 ts_pad->codec_data = NULL;
474 GST_ERROR_OBJECT (mux, "Need codec_data for raw MPEG-4 AAC");
475 goto not_negotiated;
476 }
477 } else if (codec_data) {
478 ts_pad->codec_data = gst_buffer_ref (codec_data);
479 } else {
480 ts_pad->codec_data = NULL;
481 }
482 break;
483 }
484 default:
485 GST_WARNING_OBJECT (ts_pad, "unsupported mpegversion %d", mpegversion);
486 goto not_negotiated;
487 }
488 } else if (strcmp (mt, "video/mpeg") == 0) {
489 gint mpegversion;
490
491 if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
492 GST_ERROR_OBJECT (ts_pad, "caps missing mpegversion");
493 goto not_negotiated;
494 }
495
496 switch (mpegversion) {
497 case 1:
498 st = TSMUX_ST_VIDEO_MPEG1;
499 break;
500 case 2:
501 st = TSMUX_ST_VIDEO_MPEG2;
502 break;
503 case 4:
504 st = TSMUX_ST_VIDEO_MPEG4;
505 break;
506 default:
507 GST_WARNING_OBJECT (ts_pad, "unsupported mpegversion %d", mpegversion);
508 goto not_negotiated;
509 }
510 } else if (strcmp (mt, "subpicture/x-dvb") == 0) {
511 st = TSMUX_ST_PS_DVB_SUBPICTURE;
512 } else if (strcmp (mt, "application/x-teletext") == 0) {
513 st = TSMUX_ST_PS_TELETEXT;
514 /* needs a particularly sized layout */
515 ts_pad->prepare_func = gst_base_ts_mux_prepare_teletext;
516 } else if (strcmp (mt, "audio/x-opus") == 0) {
517 guint8 channels, mapping_family, stream_count, coupled_count;
518 guint8 channel_mapping[256];
519
520 if (!gst_codec_utils_opus_parse_caps (caps, NULL, &channels,
521 &mapping_family, &stream_count, &coupled_count, channel_mapping)) {
522 GST_ERROR_OBJECT (ts_pad, "Incomplete Opus caps");
523 goto not_negotiated;
524 }
525
526 if (channels <= 2 && mapping_family == 0) {
527 opus_channel_config_code = channels;
528 } else if (channels == 2 && mapping_family == 255 && stream_count == 1
529 && coupled_count == 1) {
530 /* Dual mono */
531 opus_channel_config_code = 0;
532 } else if (channels >= 2 && channels <= 8 && mapping_family == 1) {
533 static const guint8 coupled_stream_counts[9] = {
534 1, 0, 1, 1, 2, 2, 2, 3, 3
535 };
536 static const guint8 channel_map_a[8][8] = {
537 {0},
538 {0, 1},
539 {0, 2, 1},
540 {0, 1, 2, 3},
541 {0, 4, 1, 2, 3},
542 {0, 4, 1, 2, 3, 5},
543 {0, 4, 1, 2, 3, 5, 6},
544 {0, 6, 1, 2, 3, 4, 5, 7},
545 };
546 static const guint8 channel_map_b[8][8] = {
547 {0},
548 {0, 1},
549 {0, 1, 2},
550 {0, 1, 2, 3},
551 {0, 1, 2, 3, 4},
552 {0, 1, 2, 3, 4, 5},
553 {0, 1, 2, 3, 4, 5, 6},
554 {0, 1, 2, 3, 4, 5, 6, 7},
555 };
556
557 /* Vorbis mapping */
558 if (stream_count == channels - coupled_stream_counts[channels] &&
559 coupled_count == coupled_stream_counts[channels] &&
560 memcmp (channel_mapping, channel_map_a[channels - 1],
561 channels) == 0) {
562 opus_channel_config_code = channels;
563 } else if (stream_count == channels - coupled_stream_counts[channels] &&
564 coupled_count == coupled_stream_counts[channels] &&
565 memcmp (channel_mapping, channel_map_b[channels - 1],
566 channels) == 0) {
567 opus_channel_config_code = channels | 0x80;
568 } else {
569 GST_FIXME_OBJECT (ts_pad, "Opus channel mapping not handled");
570 goto not_negotiated;
571 }
572 }
573
574 st = TSMUX_ST_PS_OPUS;
575 ts_pad->prepare_func = gst_base_ts_mux_prepare_opus;
576 } else if (strcmp (mt, "meta/x-klv") == 0) {
577 st = TSMUX_ST_PS_KLV;
578 } else if (strcmp (mt, "image/x-jpc") == 0) {
579 /*
580 * See this document for more details on standard:
581 *
582 * https://www.itu.int/rec/T-REC-H.222.0-201206-S/en
583 * Annex S describes J2K details
584 * Page 104 of this document describes J2k video descriptor
585 */
586
587 const GValue *vProfile = gst_structure_get_value (s, "profile");
588 const GValue *vMainlevel = gst_structure_get_value (s, "main-level");
589 const GValue *vFramerate = gst_structure_get_value (s, "framerate");
590 const GValue *vColorimetry = gst_structure_get_value (s, "colorimetry");
591 j2k_private_data *private_data;
592
593 /* for now, we relax the condition that profile must exist and equal
594 * GST_JPEG2000_PARSE_PROFILE_BC_SINGLE */
595 if (vProfile) {
596 profile = g_value_get_int (vProfile);
597 if (profile != GST_JPEG2000_PARSE_PROFILE_BC_SINGLE) {
598 GST_LOG_OBJECT (ts_pad, "Invalid JPEG 2000 profile %d", profile);
599 /* goto not_negotiated; */
600 }
601 }
602 /* for now, we will relax the condition that the main level must be present */
603 if (vMainlevel) {
604 main_level = g_value_get_uint (vMainlevel);
605 if (main_level > 11) {
606 GST_ERROR_OBJECT (ts_pad, "Invalid main level %d", main_level);
607 goto not_negotiated;
608 }
609 if (main_level >= 6) {
610 max_rate = 2 ^ (main_level - 6) * 1600 * 1000000;
611 } else {
612 switch (main_level) {
613 case 0:
614 case 1:
615 case 2:
616 case 3:
617 max_rate = 200 * 1000000;
618 break;
619 case 4:
620 max_rate = 400 * 1000000;
621 break;
622 case 5:
623 max_rate = 800 * 1000000;
624 break;
625 default:
626 break;
627 }
628 }
629 } else {
630 /* GST_ERROR_OBJECT (ts_pad, "Missing main level");
631 * goto not_negotiated; */
632 }
633
634 /* We always mux video in J2K-over-MPEG-TS non-interlaced mode */
635 private_data = g_new0 (j2k_private_data, 1);
636 private_data->interlace = FALSE;
637 private_data->den = 0;
638 private_data->num = 0;
639 private_data->max_bitrate = max_rate;
640 private_data->color_spec = 1;
641 /* these two fields are not used, since we always mux as non-interlaced */
642 private_data->Fic = 1;
643 private_data->Fio = 0;
644
645 /* Get Framerate */
646 if (vFramerate != NULL) {
647 /* Data for ELSM header */
648 private_data->num = gst_value_get_fraction_numerator (vFramerate);
649 private_data->den = gst_value_get_fraction_denominator (vFramerate);
650 }
651 /* Get Colorimetry */
652 if (vColorimetry) {
653 const char *colorimetry = g_value_get_string (vColorimetry);
654 color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_SRGB; /* RGB as default */
655 if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT601)) {
656 color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC601;
657 } else {
658 if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT709)
659 || g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_SMPTE240M)) {
660 color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC709;
661 }
662 }
663 private_data->color_spec = color_spec;
664 } else {
665 GST_ERROR_OBJECT (ts_pad, "Colorimetry not present in caps");
666 g_free (private_data);
667 goto not_negotiated;
668 }
669 st = TSMUX_ST_VIDEO_JP2K;
670 ts_pad->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
671 ts_pad->prepare_data = private_data;
672 ts_pad->free_func = gst_base_ts_mux_free_jpeg2000;
673 } else {
674 GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
675
676 if (klass->handle_media_type) {
677 st = klass->handle_media_type (mux, mt, ts_pad);
678 }
679 }
680
681 if (st == TSMUX_ST_RESERVED) {
682 GST_ERROR_OBJECT (ts_pad, "Failed to determine stream type");
683 goto error;
684 }
685
686 if (ts_pad->stream && st != ts_pad->stream->stream_type) {
687 GST_ELEMENT_ERROR (mux, STREAM, MUX,
688 ("Stream type change from %02x to %02x not supported",
689 ts_pad->stream->stream_type, st), NULL);
690 goto error;
691 }
692
693 if (ts_pad->stream == NULL) {
694 ts_pad->stream =
695 tsmux_create_stream (mux->tsmux, st, ts_pad->pid, ts_pad->language);
696 if (ts_pad->stream == NULL)
697 goto error;
698 }
699
700 pmt_name = g_strdup_printf ("PMT_%d", ts_pad->pid);
701 if (mux->prog_map && gst_structure_has_field (mux->prog_map, pmt_name)) {
702 gst_structure_get_int (mux->prog_map, pmt_name, &ts_pad->stream->pmt_index);
703 }
704 g_free (pmt_name);
705
706 interlace_mode = gst_structure_get_string (s, "interlace-mode");
707 gst_structure_get_int (s, "rate", &ts_pad->stream->audio_sampling);
708 gst_structure_get_int (s, "channels", &ts_pad->stream->audio_channels);
709 gst_structure_get_int (s, "bitrate", &ts_pad->stream->audio_bitrate);
710
711 /* frame rate */
712 gst_structure_get_fraction (s, "framerate", &ts_pad->stream->num,
713 &ts_pad->stream->den);
714
715 /* Interlace mode */
716 ts_pad->stream->interlace_mode = FALSE;
717 if (interlace_mode) {
718 ts_pad->stream->interlace_mode =
719 g_str_equal (interlace_mode, "interleaved");
720 }
721
722 /* Width and Height */
723 gst_structure_get_int (s, "width", &ts_pad->stream->horizontal_size);
724 gst_structure_get_int (s, "height", &ts_pad->stream->vertical_size);
725
726 ts_pad->stream->color_spec = color_spec;
727 ts_pad->stream->max_bitrate = max_rate;
728 ts_pad->stream->profile_and_level = profile | main_level;
729
730 ts_pad->stream->opus_channel_config_code = opus_channel_config_code;
731
732 tsmux_stream_set_buffer_release_func (ts_pad->stream, release_buffer_cb);
733
734 return GST_FLOW_OK;
735
736 /* ERRORS */
737 not_negotiated:
738 return GST_FLOW_NOT_NEGOTIATED;
739
740 error:
741 return GST_FLOW_ERROR;
742 }
743
744 static gboolean
is_valid_pmt_pid(guint16 pmt_pid)745 is_valid_pmt_pid (guint16 pmt_pid)
746 {
747 if (pmt_pid < 0x0010 || pmt_pid > 0x1ffe)
748 return FALSE;
749 return TRUE;
750 }
751
752 /* Must be called with mux->lock held */
753 static GstFlowReturn
gst_base_ts_mux_create_stream(GstBaseTsMux * mux,GstBaseTsMuxPad * ts_pad)754 gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
755 {
756 GstCaps *caps = gst_pad_get_current_caps (GST_PAD (ts_pad));
757 GstFlowReturn ret;
758
759 if (caps == NULL) {
760 GST_DEBUG_OBJECT (ts_pad, "Sink pad caps were not set before pushing");
761 return GST_FLOW_NOT_NEGOTIATED;
762 }
763
764 ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
765 gst_caps_unref (caps);
766
767 if (ret == GST_FLOW_OK) {
768 tsmux_program_add_stream (ts_pad->prog, ts_pad->stream);
769 }
770
771 return ret;
772 }
773
774 /* Must be called with mux->lock held */
775 static GstFlowReturn
gst_base_ts_mux_create_pad_stream(GstBaseTsMux * mux,GstPad * pad)776 gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
777 {
778 GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
779 gchar *name = NULL;
780 gchar *prop_name;
781 GstFlowReturn ret = GST_FLOW_OK;
782
783 if (ts_pad->prog_id == -1) {
784 name = GST_PAD_NAME (pad);
785 if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) {
786 gint idx;
787 gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
788 if (!ret) {
789 GST_ELEMENT_ERROR (mux, STREAM, MUX,
790 ("Reading program map failed. Assuming default"), (NULL));
791 idx = DEFAULT_PROG_ID;
792 }
793 if (idx < 0) {
794 GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
795 "than zero; DEFAULT_PROGRAM = %d is used instead",
796 idx, name, DEFAULT_PROG_ID);
797 idx = DEFAULT_PROG_ID;
798 }
799 ts_pad->prog_id = idx;
800 } else {
801 ts_pad->prog_id = DEFAULT_PROG_ID;
802 }
803 }
804
805 ts_pad->prog =
806 (TsMuxProgram *) g_hash_table_lookup (mux->programs,
807 GINT_TO_POINTER (ts_pad->prog_id));
808 if (ts_pad->prog == NULL) {
809 ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
810 if (ts_pad->prog == NULL)
811 goto no_program;
812 tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
813 tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
814 tsmux_program_set_scte35_interval (ts_pad->prog, mux->scte35_null_interval);
815 g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
816 ts_pad->prog);
817
818 /* Check for user-specified PMT PID */
819 prop_name = g_strdup_printf ("PMT_%d", ts_pad->prog->pgm_number);
820 if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
821 guint pmt_pid;
822
823 if (gst_structure_get_uint (mux->prog_map, prop_name, &pmt_pid)) {
824 if (is_valid_pmt_pid (pmt_pid)) {
825 GST_DEBUG_OBJECT (mux, "User specified pid=%u as PMT for "
826 "program (prog_id = %d)", pmt_pid, ts_pad->prog->pgm_number);
827 tsmux_program_set_pmt_pid (ts_pad->prog, pmt_pid);
828 } else {
829 GST_ELEMENT_WARNING (mux, LIBRARY, SETTINGS,
830 ("User specified PMT pid %u for program %d is not valid.",
831 pmt_pid, ts_pad->prog->pgm_number), (NULL));
832 }
833 }
834 }
835 g_free (prop_name);
836 }
837
838 if (ts_pad->stream == NULL) {
839 ret = gst_base_ts_mux_create_stream (mux, ts_pad);
840 if (ret != GST_FLOW_OK)
841 goto no_stream;
842 }
843
844 if (ts_pad->prog->pcr_stream == NULL) {
845 /* Take the first stream of the program for the PCR */
846 GST_DEBUG_OBJECT (ts_pad,
847 "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
848 ts_pad->pid, ts_pad->prog_id);
849
850 tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
851 }
852
853 /* Check for user-specified PCR PID */
854 prop_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
855 if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
856 const gchar *sink_name =
857 gst_structure_get_string (mux->prog_map, prop_name);
858
859 if (!g_strcmp0 (name, sink_name)) {
860 GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
861 "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
862 tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
863 }
864 }
865 g_free (prop_name);
866
867 return ret;
868
869 /* ERRORS */
870 no_program:
871 {
872 GST_ELEMENT_ERROR (mux, STREAM, MUX,
873 ("Could not create new program"), (NULL));
874 return GST_FLOW_ERROR;
875 }
876 no_stream:
877 {
878 GST_ELEMENT_ERROR (mux, STREAM, MUX,
879 ("Could not create handler for stream"), (NULL));
880 return ret;
881 }
882 }
883
884 /* Must be called with mux->lock held */
885 static gboolean
gst_base_ts_mux_create_pad_stream_func(GstElement * element,GstPad * pad,gpointer user_data)886 gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
887 gpointer user_data)
888 {
889 GstFlowReturn *ret = user_data;
890
891 *ret = gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad);
892
893 return *ret == GST_FLOW_OK;
894 }
895
896 /* Must be called with mux->lock held */
897 static GstFlowReturn
gst_base_ts_mux_create_streams(GstBaseTsMux * mux)898 gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
899 {
900 GstFlowReturn ret = GST_FLOW_OK;
901
902 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (mux),
903 gst_base_ts_mux_create_pad_stream_func, &ret);
904
905 return ret;
906 }
907
908 static void
new_packet_common_init(GstBaseTsMux * mux,GstBuffer * buf,guint8 * data,guint len)909 new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
910 guint len)
911 {
912 /* Packets should be at least 188 bytes, but check anyway */
913 g_assert (len >= 2 || !data);
914
915 if (!mux->streamheader_sent && data) {
916 guint pid = ((data[1] & 0x1f) << 8) | data[2];
917 /* if it's a PAT or a PMT */
918 if (pid == 0x00 || (pid >= TSMUX_START_PMT_PID && pid < TSMUX_START_ES_PID)) {
919 GstBuffer *hbuf;
920
921 if (!buf) {
922 hbuf = gst_buffer_new_and_alloc (len);
923 gst_buffer_fill (hbuf, 0, data, len);
924 } else {
925 hbuf = gst_buffer_copy (buf);
926 }
927 GST_LOG_OBJECT (mux,
928 "Collecting packet with pid 0x%04x into streamheaders", pid);
929
930 g_queue_push_tail (&mux->streamheader, hbuf);
931 } else if (!g_queue_is_empty (&mux->streamheader)) {
932 gst_base_ts_mux_set_header_on_caps (mux);
933 mux->streamheader_sent = TRUE;
934 }
935 }
936
937 if (buf) {
938 if (mux->is_header) {
939 GST_LOG_OBJECT (mux, "marking as header buffer");
940 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_HEADER);
941 }
942 if (mux->is_delta) {
943 GST_LOG_OBJECT (mux, "marking as delta unit");
944 GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
945 } else {
946 GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
947 mux->is_delta = TRUE;
948 }
949 }
950 }
951
952 static GstFlowReturn
gst_base_ts_mux_push_packets(GstBaseTsMux * mux,gboolean force)953 gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
954 {
955 GstBufferList *buffer_list;
956 gint align = mux->alignment;
957 gint av, packet_size;
958
959 packet_size = mux->packet_size;
960
961 if (align < 0)
962 align = mux->automatic_alignment;
963
964 av = gst_adapter_available (mux->out_adapter);
965 GST_LOG_OBJECT (mux, "align %d, av %d", align, av);
966
967 if (av == 0)
968 return GST_FLOW_OK;
969
970 /* no alignment, just push all available data */
971 if (align == 0) {
972 buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av);
973 return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux),
974 buffer_list);
975 }
976
977 align *= packet_size;
978
979 if (!force && align > av)
980 return GST_FLOW_OK;
981
982 buffer_list = gst_buffer_list_new_sized ((av / align) + 1);
983
984 GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
985 while (align <= av) {
986 GstBuffer *buf;
987 GstClockTime pts;
988
989 pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
990 buf = gst_adapter_take_buffer (mux->out_adapter, align);
991
992 GST_BUFFER_PTS (buf) = pts;
993
994 gst_buffer_list_add (buffer_list, buf);
995 av -= align;
996 }
997
998 if (av > 0 && force) {
999 GstBuffer *buf;
1000 GstClockTime pts;
1001 guint8 *data;
1002 guint32 header;
1003 gint dummy;
1004 GstMapInfo map;
1005
1006 GST_LOG_OBJECT (mux, "handling %d leftover bytes", av);
1007
1008 pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
1009 buf = gst_buffer_new_and_alloc (align);
1010
1011 GST_BUFFER_PTS (buf) = pts;
1012
1013 gst_buffer_map (buf, &map, GST_MAP_READ);
1014 data = map.data;
1015
1016 gst_adapter_copy (mux->out_adapter, data, 0, av);
1017 gst_adapter_clear (mux->out_adapter);
1018
1019 data += av;
1020 header = GST_READ_UINT32_BE (data - packet_size);
1021
1022 dummy = (map.size - av) / packet_size;
1023 GST_LOG_OBJECT (mux, "adding %d null packets", dummy);
1024
1025 for (; dummy > 0; dummy--) {
1026 gint offset;
1027
1028 if (packet_size > GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH) {
1029 GST_WRITE_UINT32_BE (data, header);
1030 /* simply increase header a bit and never mind too much */
1031 header++;
1032 offset = 4;
1033 } else {
1034 offset = 0;
1035 }
1036 GST_WRITE_UINT8 (data + offset, TSMUX_SYNC_BYTE);
1037 /* null packet PID */
1038 GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
1039 /* no adaptation field exists | continuity counter undefined */
1040 GST_WRITE_UINT8 (data + offset + 3, 0x10);
1041 /* payload */
1042 memset (data + offset + 4, 0, GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH - 4);
1043 data += packet_size;
1044 }
1045
1046 gst_buffer_unmap (buf, &map);
1047 gst_buffer_list_add (buffer_list, buf);
1048 }
1049
1050 return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux), buffer_list);
1051 }
1052
1053 static GstFlowReturn
gst_base_ts_mux_collect_packet(GstBaseTsMux * mux,GstBuffer * buf)1054 gst_base_ts_mux_collect_packet (GstBaseTsMux * mux, GstBuffer * buf)
1055 {
1056 GST_LOG_OBJECT (mux, "collecting packet size %" G_GSIZE_FORMAT,
1057 gst_buffer_get_size (buf));
1058 gst_adapter_push (mux->out_adapter, buf);
1059
1060 return GST_FLOW_OK;
1061 }
1062
1063 static GstEvent *
check_pending_key_unit_event(GstEvent * pending_event,GstSegment * segment,GstClockTime timestamp,guint flags,GstClockTime pending_key_unit_ts)1064 check_pending_key_unit_event (GstEvent * pending_event, GstSegment * segment,
1065 GstClockTime timestamp, guint flags, GstClockTime pending_key_unit_ts)
1066 {
1067 GstClockTime running_time, stream_time;
1068 gboolean all_headers;
1069 guint count;
1070 GstEvent *event = NULL;
1071
1072 g_assert (segment != NULL);
1073
1074 if (pending_event == NULL)
1075 goto out;
1076
1077 if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1078 timestamp == GST_CLOCK_TIME_NONE)
1079 goto out;
1080
1081 running_time = timestamp;
1082
1083 GST_INFO ("now %" GST_TIME_FORMAT " wanted %" GST_TIME_FORMAT,
1084 GST_TIME_ARGS (running_time), GST_TIME_ARGS (pending_key_unit_ts));
1085 if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1086 running_time < pending_key_unit_ts)
1087 goto out;
1088
1089 if (flags & GST_BUFFER_FLAG_DELTA_UNIT) {
1090 GST_INFO ("pending force key unit, waiting for keyframe");
1091 goto out;
1092 }
1093
1094 stream_time = gst_segment_to_stream_time (segment,
1095 GST_FORMAT_TIME, timestamp);
1096
1097 if (GST_EVENT_TYPE (pending_event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
1098 gst_video_event_parse_downstream_force_key_unit (pending_event,
1099 NULL, NULL, NULL, &all_headers, &count);
1100 } else {
1101 gst_video_event_parse_upstream_force_key_unit (pending_event, NULL,
1102 &all_headers, &count);
1103 }
1104
1105 event =
1106 gst_video_event_new_downstream_force_key_unit (timestamp, stream_time,
1107 running_time, all_headers, count);
1108 gst_event_set_seqnum (event, gst_event_get_seqnum (pending_event));
1109
1110 out:
1111 return event;
1112 }
1113
1114 /* Called when the TsMux has prepared a packet for output. Return FALSE
1115 * on error */
1116 static gboolean
new_packet_cb(GstBuffer * buf,void * user_data,gint64 new_pcr)1117 new_packet_cb (GstBuffer * buf, void *user_data, gint64 new_pcr)
1118 {
1119 GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1120 GstAggregator *agg = GST_AGGREGATOR (mux);
1121 GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1122 GstMapInfo map;
1123 GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1124
1125 g_assert (klass->output_packet);
1126
1127 gst_buffer_map (buf, &map, GST_MAP_READWRITE);
1128
1129 if (!GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1130 /* tsmux isn't generating timestamps. Use the input times */
1131 GST_BUFFER_PTS (buf) = mux->last_ts;
1132 }
1133
1134 if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1135 if (!GST_CLOCK_STIME_IS_VALID (mux->output_ts_offset)) {
1136 GstClockTime output_start_time = agg_segment->position;
1137 if (agg_segment->position == -1
1138 || agg_segment->position < agg_segment->start) {
1139 output_start_time = agg_segment->start;
1140 }
1141
1142 mux->output_ts_offset =
1143 GST_CLOCK_DIFF (GST_BUFFER_PTS (buf), output_start_time);
1144
1145 GST_DEBUG_OBJECT (mux, "New output ts offset %" GST_STIME_FORMAT,
1146 GST_STIME_ARGS (mux->output_ts_offset));
1147 }
1148
1149 GST_BUFFER_PTS (buf) += mux->output_ts_offset;
1150
1151 agg_segment->position = GST_BUFFER_PTS (buf);
1152 } else if (agg_segment->position == -1
1153 || agg_segment->position < agg_segment->start) {
1154 GST_BUFFER_PTS (buf) = agg_segment->start;
1155 } else {
1156 GST_BUFFER_PTS (buf) = agg_segment->position;
1157 }
1158
1159 /* do common init (flags and streamheaders) */
1160 new_packet_common_init (mux, buf, map.data, map.size);
1161
1162 gst_buffer_unmap (buf, &map);
1163
1164 return klass->output_packet (mux, buf, new_pcr);
1165 }
1166
1167 /* called when TsMux needs new packet to write into */
1168 static void
alloc_packet_cb(GstBuffer ** buf,void * user_data)1169 alloc_packet_cb (GstBuffer ** buf, void *user_data)
1170 {
1171 GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1172 GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1173
1174 g_assert (klass->allocate_packet);
1175
1176 klass->allocate_packet (mux, buf);
1177 }
1178
1179 static GstFlowReturn
gst_base_ts_mux_aggregate_buffer(GstBaseTsMux * mux,GstAggregatorPad * agg_pad,GstBuffer * buf)1180 gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
1181 GstAggregatorPad * agg_pad, GstBuffer * buf)
1182 {
1183 GstFlowReturn ret = GST_FLOW_OK;
1184 GstBaseTsMuxPad *best = GST_BASE_TS_MUX_PAD (agg_pad);
1185 TsMuxProgram *prog;
1186 gint64 pts = GST_CLOCK_STIME_NONE;
1187 gint64 dts = GST_CLOCK_STIME_NONE;
1188 gboolean delta = TRUE, header = FALSE;
1189 StreamData *stream_data;
1190 GstMpegtsSection *scte_section = NULL;
1191
1192 GST_DEBUG_OBJECT (mux, "Pads collected");
1193
1194 if (buf && gst_buffer_get_size (buf) == 0
1195 && GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP)) {
1196 gst_buffer_unref (buf);
1197 return GST_FLOW_OK;
1198 }
1199
1200 g_mutex_lock (&mux->lock);
1201 if (G_UNLIKELY (mux->first)) {
1202 ret = gst_base_ts_mux_create_streams (mux);
1203 if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1204 if (buf)
1205 gst_buffer_unref (buf);
1206 g_mutex_unlock (&mux->lock);
1207 return ret;
1208 }
1209
1210 mux->first = FALSE;
1211 }
1212
1213 prog = best->prog;
1214 if (prog == NULL) {
1215 GList *cur;
1216
1217 gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best));
1218 tsmux_resend_pat (mux->tsmux);
1219 tsmux_resend_si (mux->tsmux);
1220 prog = best->prog;
1221 g_assert_nonnull (prog);
1222
1223 /* output PMT for each program */
1224 for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1225 TsMuxProgram *program = (TsMuxProgram *) cur->data;
1226
1227 tsmux_resend_pmt (program);
1228 }
1229 }
1230
1231 g_assert (buf != NULL);
1232
1233 if (best->prepare_func) {
1234 GstBuffer *tmp;
1235
1236 tmp = best->prepare_func (buf, best, mux);
1237 g_assert (tmp);
1238 gst_buffer_unref (buf);
1239 buf = tmp;
1240 }
1241
1242 if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
1243 GstEvent *event;
1244
1245 g_mutex_unlock (&mux->lock);
1246 event = check_pending_key_unit_event (mux->force_key_unit_event,
1247 &agg_pad->segment, GST_BUFFER_PTS (buf),
1248 GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
1249 if (event) {
1250 GstClockTime running_time;
1251 guint count;
1252 GList *cur;
1253
1254 mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
1255 gst_event_replace (&mux->force_key_unit_event, NULL);
1256
1257 gst_video_event_parse_downstream_force_key_unit (event,
1258 NULL, NULL, &running_time, NULL, &count);
1259
1260 GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
1261 "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
1262 GST_TIME_ARGS (running_time), count);
1263 gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
1264
1265 g_mutex_lock (&mux->lock);
1266 /* output PAT, SI tables */
1267 tsmux_resend_pat (mux->tsmux);
1268 tsmux_resend_si (mux->tsmux);
1269
1270 /* output PMT for each program */
1271 for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1272 TsMuxProgram *program = (TsMuxProgram *) cur->data;
1273
1274 tsmux_resend_pmt (program);
1275 }
1276 } else {
1277 g_mutex_lock (&mux->lock);
1278 }
1279 }
1280
1281 if (G_UNLIKELY (prog->pcr_stream == NULL)) {
1282 /* Take the first data stream for the PCR */
1283 GST_DEBUG_OBJECT (best,
1284 "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
1285 best->pid, best->prog_id);
1286
1287 /* Set the chosen PCR stream */
1288 tsmux_program_set_pcr_stream (prog, best->stream);
1289 }
1290
1291 GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
1292
1293 GST_OBJECT_LOCK (mux);
1294 scte_section = mux->pending_scte35_section;
1295 mux->pending_scte35_section = NULL;
1296 GST_OBJECT_UNLOCK (mux);
1297 if (G_UNLIKELY (scte_section)) {
1298 GST_DEBUG_OBJECT (mux, "Sending pending SCTE section");
1299 if (!tsmux_send_section (mux->tsmux, scte_section))
1300 GST_ERROR_OBJECT (mux, "Error sending SCTE section !");
1301 }
1302
1303 if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1304 pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
1305 GST_DEBUG_OBJECT (mux, "Buffer has PTS %" GST_TIME_FORMAT " pts %"
1306 G_GINT64_FORMAT "%s", GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts,
1307 !GST_BUFFER_FLAG_IS_SET (buf,
1308 GST_BUFFER_FLAG_DELTA_UNIT) ? " (keyframe)" : "");
1309 }
1310
1311 if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
1312 dts = GSTTIME_TO_MPEGTIME (best->dts);
1313 GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
1314 G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
1315 }
1316
1317 /* should not have a DTS without PTS */
1318 if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
1319 GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
1320 pts = dts;
1321 }
1322
1323 if (best->stream->is_video_stream) {
1324 delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1325 header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
1326 }
1327
1328 if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
1329 GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
1330
1331 gst_buffer_unref (buf);
1332 g_mutex_unlock (&mux->lock);
1333 return GST_FLOW_OK;
1334 }
1335
1336 GST_DEBUG_OBJECT (mux, "delta: %d", delta);
1337
1338 if (gst_buffer_get_size (buf) > 0) {
1339 stream_data = stream_data_new (buf);
1340 tsmux_stream_add_data (best->stream, stream_data->map_info.data,
1341 stream_data->map_info.size, stream_data, pts, dts, !delta);
1342 }
1343
1344 /* outgoing ts follows ts of PCR program stream */
1345 if (prog->pcr_stream == best->stream) {
1346 /* prefer DTS if present for PCR as it should be monotone */
1347 mux->last_ts =
1348 GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
1349 GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
1350 }
1351
1352 mux->is_delta = delta;
1353 mux->is_header = header;
1354 while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
1355 if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
1356 /* Failed writing data for some reason. Set appropriate error */
1357 GST_DEBUG_OBJECT (mux, "Failed to write data packet");
1358 GST_ELEMENT_ERROR (mux, STREAM, MUX,
1359 ("Failed writing output data to stream %04x", best->stream->id),
1360 (NULL));
1361 goto write_fail;
1362 }
1363 }
1364 g_mutex_unlock (&mux->lock);
1365 /* flush packet cache */
1366 return gst_base_ts_mux_push_packets (mux, FALSE);
1367
1368 /* ERRORS */
1369 write_fail:
1370 {
1371 return mux->last_flow_ret;
1372 }
1373 }
1374
1375 /* GstElement implementation */
1376 static gboolean
gst_base_ts_mux_has_pad_with_pid(GstBaseTsMux * mux,guint16 pid)1377 gst_base_ts_mux_has_pad_with_pid (GstBaseTsMux * mux, guint16 pid)
1378 {
1379 GList *l;
1380 gboolean res = FALSE;
1381
1382 GST_OBJECT_LOCK (mux);
1383
1384 for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1385 GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
1386
1387 if (tpad->pid == pid) {
1388 res = TRUE;
1389 break;
1390 }
1391 }
1392
1393 GST_OBJECT_UNLOCK (mux);
1394 return res;
1395 }
1396
1397 static GstPad *
gst_base_ts_mux_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)1398 gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
1399 const gchar * name, const GstCaps * caps)
1400 {
1401 GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1402 gint pid = -1;
1403 GstPad *pad = NULL;
1404 gchar *free_name = NULL;
1405
1406 g_mutex_lock (&mux->lock);
1407 if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
1408 if (tsmux_find_stream (mux->tsmux, pid)) {
1409 g_mutex_unlock (&mux->lock);
1410 goto stream_exists;
1411 }
1412 /* Make sure we don't use reserved PID.
1413 * FIXME : This should be extended to other variants (ex: ATSC) reserved PID */
1414 if (pid < TSMUX_START_ES_PID)
1415 goto invalid_stream_pid;
1416 } else {
1417 do {
1418 pid = tsmux_get_new_pid (mux->tsmux);
1419 } while (gst_base_ts_mux_has_pad_with_pid (mux, pid));
1420
1421 /* Name the pad correctly after the selected pid */
1422 name = free_name = g_strdup_printf ("sink_%d", pid);
1423 }
1424 g_mutex_unlock (&mux->lock);
1425
1426 pad = (GstPad *)
1427 GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
1428 templ, name, caps);
1429
1430 gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (pad));
1431 GST_BASE_TS_MUX_PAD (pad)->pid = pid;
1432
1433 g_free (free_name);
1434
1435 return pad;
1436
1437 /* ERRORS */
1438 stream_exists:
1439 {
1440 GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"),
1441 (NULL));
1442 return NULL;
1443 }
1444
1445 invalid_stream_pid:
1446 {
1447 GST_ELEMENT_ERROR (element, STREAM, MUX,
1448 ("Invalid Elementary stream PID (0x%02u < 0x40)", pid), (NULL));
1449 return NULL;
1450 }
1451 }
1452
1453 static void
gst_base_ts_mux_release_pad(GstElement * element,GstPad * pad)1454 gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
1455 {
1456 GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1457
1458 g_mutex_lock (&mux->lock);
1459 if (mux->tsmux) {
1460 GList *cur;
1461 GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
1462 gint pid = ts_pad->pid;
1463
1464 if (ts_pad->prog) {
1465 if (ts_pad->prog->pcr_stream == ts_pad->stream) {
1466 tsmux_program_set_pcr_stream (ts_pad->prog, NULL);
1467 }
1468 if (tsmux_remove_stream (mux->tsmux, pid, ts_pad->prog)) {
1469 g_hash_table_remove (mux->programs, GINT_TO_POINTER (ts_pad->prog_id));
1470 }
1471 }
1472
1473 tsmux_resend_pat (mux->tsmux);
1474 tsmux_resend_si (mux->tsmux);
1475
1476 /* output PMT for each program */
1477 for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1478 TsMuxProgram *program = (TsMuxProgram *) cur->data;
1479
1480 tsmux_resend_pmt (program);
1481 }
1482 }
1483 g_mutex_unlock (&mux->lock);
1484
1485 GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad);
1486 }
1487
1488 /* GstAggregator implementation */
1489
1490 static void
request_keyframe(GstBaseTsMux * mux,GstClockTime running_time)1491 request_keyframe (GstBaseTsMux * mux, GstClockTime running_time)
1492 {
1493 GList *l;
1494 GST_OBJECT_LOCK (mux);
1495
1496 for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1497 gst_pad_push_event (GST_PAD (l->data),
1498 gst_video_event_new_upstream_force_key_unit (running_time, TRUE, 0));
1499 }
1500
1501 GST_OBJECT_UNLOCK (mux);
1502 }
1503
1504 static const guint32 crc_tab[256] = {
1505 0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b,
1506 0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
1507 0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd, 0x4c11db70, 0x48d0c6c7,
1508 0x4593e01e, 0x4152fda9, 0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
1509 0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011, 0x791d4014, 0x7ddc5da3,
1510 0x709f7b7a, 0x745e66cd, 0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039,
1511 0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5, 0xbe2b5b58, 0xbaea46ef,
1512 0xb7a96036, 0xb3687d81, 0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d,
1513 0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49, 0xc7361b4c, 0xc3f706fb,
1514 0xceb42022, 0xca753d95, 0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
1515 0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d, 0x34867077, 0x30476dc0,
1516 0x3d044b19, 0x39c556ae, 0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072,
1517 0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16, 0x018aeb13, 0x054bf6a4,
1518 0x0808d07d, 0x0cc9cdca, 0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde,
1519 0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02, 0x5e9f46bf, 0x5a5e5b08,
1520 0x571d7dd1, 0x53dc6066, 0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
1521 0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e, 0xbfa1b04b, 0xbb60adfc,
1522 0xb6238b25, 0xb2e29692, 0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6,
1523 0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a, 0xe0b41de7, 0xe4750050,
1524 0xe9362689, 0xedf73b3e, 0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2,
1525 0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686, 0xd5b88683, 0xd1799b34,
1526 0xdc3abded, 0xd8fba05a, 0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
1527 0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb, 0x4f040d56, 0x4bc510e1,
1528 0x46863638, 0x42472b8f, 0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53,
1529 0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47, 0x36194d42, 0x32d850f5,
1530 0x3f9b762c, 0x3b5a6b9b, 0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff,
1531 0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623, 0xf12f560e, 0xf5ee4bb9,
1532 0xf8ad6d60, 0xfc6c70d7, 0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
1533 0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f, 0xc423cd6a, 0xc0e2d0dd,
1534 0xcda1f604, 0xc960ebb3, 0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7,
1535 0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b, 0x9b3660c6, 0x9ff77d71,
1536 0x92b45ba8, 0x9675461f, 0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3,
1537 0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640, 0x4e8ee645, 0x4a4ffbf2,
1538 0x470cdd2b, 0x43cdc09c, 0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
1539 0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24, 0x119b4be9, 0x155a565e,
1540 0x18197087, 0x1cd86d30, 0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec,
1541 0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088, 0x2497d08d, 0x2056cd3a,
1542 0x2d15ebe3, 0x29d4f654, 0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0,
1543 0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c, 0xe3a1cbc1, 0xe760d676,
1544 0xea23f0af, 0xeee2ed18, 0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
1545 0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662,
1546 0x933eb0bb, 0x97ffad0c, 0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
1547 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
1548 };
1549
1550 static guint32
_calc_crc32(const guint8 * data,guint datalen)1551 _calc_crc32 (const guint8 * data, guint datalen)
1552 {
1553 gint i;
1554 guint32 crc = 0xffffffff;
1555
1556 for (i = 0; i < datalen; i++) {
1557 crc = (crc << 8) ^ crc_tab[((crc >> 24) ^ *data++) & 0xff];
1558 }
1559 return crc;
1560 }
1561
1562 #define MPEGTIME_TO_GSTTIME(t) ((t) * (guint64)100000 / 9)
1563
1564 static GstMpegtsSCTESpliceEvent *
copy_splice(GstMpegtsSCTESpliceEvent * splice)1565 copy_splice (GstMpegtsSCTESpliceEvent * splice)
1566 {
1567 return g_boxed_copy (GST_TYPE_MPEGTS_SCTE_SPLICE_EVENT, splice);
1568 }
1569
1570 static void
free_splice(GstMpegtsSCTESpliceEvent * splice)1571 free_splice (GstMpegtsSCTESpliceEvent * splice)
1572 {
1573 g_boxed_free (GST_TYPE_MPEGTS_SCTE_SPLICE_EVENT, splice);
1574 }
1575
1576 /* FIXME: get rid of this when depending on glib >= 2.62 */
1577
1578 static GPtrArray *
_g_ptr_array_copy(GPtrArray * array,GCopyFunc func,GFreeFunc free_func,gpointer user_data)1579 _g_ptr_array_copy (GPtrArray * array,
1580 GCopyFunc func, GFreeFunc free_func, gpointer user_data)
1581 {
1582 GPtrArray *new_array;
1583
1584 g_return_val_if_fail (array != NULL, NULL);
1585
1586 new_array = g_ptr_array_new_with_free_func (free_func);
1587
1588 g_ptr_array_set_size (new_array, array->len);
1589
1590 if (func != NULL) {
1591 guint i;
1592
1593 for (i = 0; i < array->len; i++)
1594 new_array->pdata[i] = func (array->pdata[i], user_data);
1595 } else if (array->len > 0) {
1596 memcpy (new_array->pdata, array->pdata,
1597 array->len * sizeof (*array->pdata));
1598 }
1599
1600 new_array->len = array->len;
1601
1602 return new_array;
1603 }
1604
1605 static GstMpegtsSCTESIT *
deep_copy_sit(const GstMpegtsSCTESIT * sit)1606 deep_copy_sit (const GstMpegtsSCTESIT * sit)
1607 {
1608 GstMpegtsSCTESIT *sit_copy = g_boxed_copy (GST_TYPE_MPEGTS_SCTE_SIT, sit);
1609 GPtrArray *splices_copy =
1610 _g_ptr_array_copy (sit_copy->splices, (GCopyFunc) copy_splice,
1611 (GFreeFunc) free_splice, NULL);
1612
1613 g_ptr_array_unref (sit_copy->splices);
1614 sit_copy->splices = splices_copy;
1615
1616 return sit_copy;
1617 }
1618
1619 /* Takes ownership of @section.
1620 *
1621 * This function is a bit complex because the SCTE sections can
1622 * have various origins:
1623 *
1624 * * Sections created by the application with the gst_mpegts_scte_*_new()
1625 * API. The splice times / durations contained by these are expressed
1626 * in the GStreamer running time domain, and must be translated to
1627 * our local PES time domain. In this case, we will packetize the section
1628 * ourselves.
1629 *
1630 * * Sections passed through from tsdemux: this case is complicated as
1631 * splice times in the incoming stream may be encrypted, with pts_adjustment
1632 * being the only timing field guaranteed *not* to be encrypted. In this
1633 * case, the original binary data (section->data) will be reinjected as is
1634 * in the output stream, with pts_adjustment adjusted. tsdemux provides us
1635 * with the pts_offset it introduces, the difference between the original
1636 * PES PTSs and the running times it outputs.
1637 *
1638 * Additionally, in either of these cases when the splice times aren't encrypted
1639 * we want to make use of those to request keyframes. For the passthrough case,
1640 * as the splice times are left untouched tsdemux provides us with the running
1641 * times the section originally referred to. We cannot calculate it locally
1642 * because we would need to have access to the information that the timestamps
1643 * in the original PES domain have wrapped around, and how many times they have
1644 * done so. While we could probably make educated guesses, tsdemux (more specifically
1645 * mpegtspacketizer) already keeps track of that, and it seemed more logical to
1646 * perform the calculation there and forward it alongside the downstream events.
1647 *
1648 * Finally, while we can't request keyframes at splice points in the encrypted
1649 * case, if the input stream was compliant in that regard and no reencoding took
1650 * place the splice times will still match with valid splice points, it is up
1651 * to the application to ensure that that is the case.
1652 */
1653 static void
handle_scte35_section(GstBaseTsMux * mux,GstEvent * event,GstMpegtsSection * section,guint64 mpeg_pts_offset,GstStructure * rtime_map)1654 handle_scte35_section (GstBaseTsMux * mux, GstEvent * event,
1655 GstMpegtsSection * section, guint64 mpeg_pts_offset,
1656 GstStructure * rtime_map)
1657 {
1658 GstMpegtsSCTESIT *sit;
1659 guint i;
1660 gboolean forward = TRUE;
1661 guint64 pts_adjust;
1662 guint8 *section_data;
1663 guint8 *crc;
1664 gboolean translate = FALSE;
1665
1666 sit = (GstMpegtsSCTESIT *) gst_mpegts_section_get_scte_sit (section);
1667
1668 /* When the application injects manually constructed splice events,
1669 * their time domain is the GStreamer running time, we receive them
1670 * unpacketized and translate the fields in the SIT to local PTS.
1671 *
1672 * We make a copy of the SIT in order to make sure we can rewrite it.
1673 */
1674 if (sit->is_running_time) {
1675 sit = deep_copy_sit (sit);
1676 translate = TRUE;
1677 }
1678
1679 switch (sit->splice_command_type) {
1680 case GST_MTS_SCTE_SPLICE_COMMAND_NULL:
1681 /* We implement heartbeating ourselves */
1682 forward = FALSE;
1683 break;
1684 case GST_MTS_SCTE_SPLICE_COMMAND_SCHEDULE:
1685 /* No need to request keyframes at this point, splice_insert
1686 * messages will precede the future splice points and we
1687 * can request keyframes then. Only translate if needed.
1688 */
1689 if (translate) {
1690 for (i = 0; i < sit->splices->len; i++) {
1691 GstMpegtsSCTESpliceEvent *sevent =
1692 g_ptr_array_index (sit->splices, i);
1693
1694 if (sevent->program_splice_time_specified)
1695 sevent->program_splice_time =
1696 GSTTIME_TO_MPEGTIME (sevent->program_splice_time) +
1697 TS_MUX_CLOCK_BASE;
1698
1699 if (sevent->duration_flag)
1700 sevent->break_duration =
1701 GSTTIME_TO_MPEGTIME (sevent->break_duration);
1702 }
1703 }
1704 break;
1705 case GST_MTS_SCTE_SPLICE_COMMAND_INSERT:
1706 /* We want keyframes at splice points */
1707 if (sit->fully_parsed && (rtime_map || translate)) {
1708
1709 for (i = 0; i < sit->splices->len; i++) {
1710 guint64 running_time = GST_CLOCK_TIME_NONE;
1711
1712 GstMpegtsSCTESpliceEvent *sevent =
1713 g_ptr_array_index (sit->splices, i);
1714 if (sevent->program_splice_time_specified) {
1715 if (rtime_map) {
1716 gchar *field_name = g_strdup_printf ("event-%u-splice-time",
1717 sevent->splice_event_id);
1718 if (gst_structure_get_uint64 (rtime_map, field_name,
1719 &running_time)) {
1720 GST_DEBUG_OBJECT (mux,
1721 "Requesting keyframe for splice point at %" GST_TIME_FORMAT,
1722 GST_TIME_ARGS (running_time));
1723 request_keyframe (mux, running_time);
1724 }
1725 g_free (field_name);
1726 } else {
1727 g_assert (translate == TRUE);
1728 running_time = sevent->program_splice_time;
1729 GST_DEBUG_OBJECT (mux,
1730 "Requesting keyframe for splice point at %" GST_TIME_FORMAT,
1731 GST_TIME_ARGS (running_time));
1732 request_keyframe (mux, running_time);
1733 sevent->program_splice_time =
1734 GSTTIME_TO_MPEGTIME (running_time) + TS_MUX_CLOCK_BASE;
1735 }
1736 } else {
1737 GST_DEBUG_OBJECT (mux,
1738 "Requesting keyframe for immediate splice point");
1739 request_keyframe (mux, GST_CLOCK_TIME_NONE);
1740 }
1741
1742 if (sevent->duration_flag) {
1743 if (translate) {
1744 sevent->break_duration =
1745 GSTTIME_TO_MPEGTIME (sevent->break_duration);
1746 }
1747
1748 /* Even if auto_return is FALSE, when a break_duration is specified it
1749 * is intended as a redundancy mechanism in case the follow-up
1750 * splice insert goes missing.
1751 *
1752 * Schedule a keyframe at that point (if we can calculate its position
1753 * accurately).
1754 */
1755 if (GST_CLOCK_TIME_IS_VALID (running_time)) {
1756 running_time += MPEGTIME_TO_GSTTIME (sevent->break_duration);
1757 GST_DEBUG_OBJECT (mux,
1758 "Requesting keyframe for end of break at %" GST_TIME_FORMAT,
1759 GST_TIME_ARGS (running_time));
1760 request_keyframe (mux, running_time);
1761 }
1762 }
1763 }
1764 }
1765 break;
1766 case GST_MTS_SCTE_SPLICE_COMMAND_TIME:{
1767 /* Adjust timestamps and potentially request keyframes */
1768 gboolean do_request_keyframes = FALSE;
1769
1770 /* TODO: we can probably be a little more fine-tuned about determining
1771 * whether a keyframe is actually needed, but this at least takes care
1772 * of the requirement in 10.3.4 that a keyframe should not be created
1773 * when the signal contains only a time_descriptor.
1774 */
1775 if (sit->fully_parsed && (rtime_map || translate)) {
1776 for (i = 0; i < sit->descriptors->len; i++) {
1777 GstMpegtsDescriptor *descriptor =
1778 g_ptr_array_index (sit->descriptors, i);
1779
1780 switch (descriptor->tag) {
1781 case GST_MTS_SCTE_DESC_AVAIL:
1782 case GST_MTS_SCTE_DESC_DTMF:
1783 case GST_MTS_SCTE_DESC_SEGMENTATION:
1784 do_request_keyframes = TRUE;
1785 break;
1786 case GST_MTS_SCTE_DESC_TIME:
1787 case GST_MTS_SCTE_DESC_AUDIO:
1788 break;
1789 }
1790
1791 if (do_request_keyframes)
1792 break;
1793 }
1794
1795 if (sit->splice_time_specified) {
1796 GstClockTime running_time = GST_CLOCK_TIME_NONE;
1797
1798 if (rtime_map) {
1799 if (do_request_keyframes
1800 && gst_structure_get_uint64 (rtime_map, "splice-time",
1801 &running_time)) {
1802 GST_DEBUG_OBJECT (mux,
1803 "Requesting keyframe for time signal at %" GST_TIME_FORMAT,
1804 GST_TIME_ARGS (running_time));
1805 request_keyframe (mux, running_time);
1806 }
1807 } else {
1808 g_assert (translate);
1809 running_time = sit->splice_time;
1810 sit->splice_time =
1811 GSTTIME_TO_MPEGTIME (running_time) + TS_MUX_CLOCK_BASE;
1812 if (do_request_keyframes) {
1813 GST_DEBUG_OBJECT (mux,
1814 "Requesting keyframe for time signal at %" GST_TIME_FORMAT,
1815 GST_TIME_ARGS (running_time));
1816 request_keyframe (mux, running_time);
1817 }
1818 }
1819 } else if (do_request_keyframes) {
1820 GST_DEBUG_OBJECT (mux,
1821 "Requesting keyframe for immediate time signal");
1822 request_keyframe (mux, GST_CLOCK_TIME_NONE);
1823 }
1824 }
1825 break;
1826 }
1827 case GST_MTS_SCTE_SPLICE_COMMAND_BANDWIDTH:
1828 case GST_MTS_SCTE_SPLICE_COMMAND_PRIVATE:
1829 /* Just let those go through untouched, none of our business */
1830 break;
1831 default:
1832 break;
1833 }
1834
1835 if (!forward) {
1836 gst_mpegts_section_unref (section);
1837 return;
1838 }
1839
1840 if (!translate) {
1841 g_assert (section->data);
1842 /* Calculate the final adjustment, as a sum of:
1843 * - The adjustment in the original packet
1844 * - The offset introduced between the original local PTS
1845 * and the GStreamer PTS output by tsdemux
1846 * - Our own 1-hour offset
1847 */
1848 pts_adjust = sit->pts_adjustment + mpeg_pts_offset + TS_MUX_CLOCK_BASE;
1849
1850 /* Account for offsets potentially introduced between the demuxer and us */
1851 pts_adjust +=
1852 GSTTIME_TO_MPEGTIME (gst_event_get_running_time_offset (event));
1853
1854 pts_adjust &= 0x1ffffffff;
1855 section_data = g_memdup2 (section->data, section->section_length);
1856 section_data[4] |= pts_adjust >> 32;
1857 section_data[5] = pts_adjust >> 24;
1858 section_data[6] = pts_adjust >> 16;
1859 section_data[7] = pts_adjust >> 8;
1860 section_data[8] = pts_adjust;
1861
1862 /* Now rewrite our checksum */
1863 crc = section_data + section->section_length - 4;
1864 GST_WRITE_UINT32_BE (crc, _calc_crc32 (section_data, crc - section_data));
1865
1866 GST_OBJECT_LOCK (mux);
1867 GST_DEBUG_OBJECT (mux, "Storing SCTE section");
1868 if (mux->pending_scte35_section)
1869 gst_mpegts_section_unref (mux->pending_scte35_section);
1870 mux->pending_scte35_section =
1871 gst_mpegts_section_new (mux->scte35_pid, section_data,
1872 section->section_length);
1873 GST_OBJECT_UNLOCK (mux);
1874
1875 gst_mpegts_section_unref (section);
1876 } else {
1877 GST_OBJECT_LOCK (mux);
1878 GST_DEBUG_OBJECT (mux, "Storing SCTE section");
1879 gst_mpegts_section_unref (section);
1880 if (mux->pending_scte35_section)
1881 gst_mpegts_section_unref (mux->pending_scte35_section);
1882 mux->pending_scte35_section =
1883 gst_mpegts_section_from_scte_sit (sit, mux->scte35_pid);;
1884 GST_OBJECT_UNLOCK (mux);
1885 }
1886 }
1887
1888 static gboolean
gst_base_ts_mux_send_event(GstElement * element,GstEvent * event)1889 gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
1890 {
1891 GstMpegtsSection *section;
1892 GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1893
1894 section = gst_event_parse_mpegts_section (event);
1895
1896 if (section) {
1897 GST_DEBUG ("Received event with mpegts section");
1898
1899 if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
1900 handle_scte35_section (mux, event, section, 0, NULL);
1901 } else {
1902 g_mutex_lock (&mux->lock);
1903 /* TODO: Check that the section type is supported */
1904 tsmux_add_mpegts_si_section (mux->tsmux, section);
1905 g_mutex_unlock (&mux->lock);
1906 }
1907
1908 gst_event_unref (event);
1909
1910 return TRUE;
1911 }
1912
1913 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1914 }
1915
1916 /* GstAggregator implementation */
1917
1918 static gboolean
gst_base_ts_mux_sink_event(GstAggregator * agg,GstAggregatorPad * agg_pad,GstEvent * event)1919 gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
1920 GstEvent * event)
1921 {
1922 GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
1923 GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
1924 GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (agg_pad);
1925 gboolean res = FALSE;
1926 gboolean forward = TRUE;
1927
1928 switch (GST_EVENT_TYPE (event)) {
1929 case GST_EVENT_CAPS:
1930 {
1931 GstCaps *caps;
1932 GstFlowReturn ret;
1933 GList *cur;
1934
1935 g_mutex_lock (&mux->lock);
1936 if (ts_pad->stream == NULL) {
1937 g_mutex_unlock (&mux->lock);
1938 break;
1939 }
1940
1941 forward = FALSE;
1942
1943 gst_event_parse_caps (event, &caps);
1944 if (!caps || !gst_caps_is_fixed (caps)) {
1945 g_mutex_unlock (&mux->lock);
1946 break;
1947 }
1948
1949 ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
1950 if (ret != GST_FLOW_OK) {
1951 g_mutex_unlock (&mux->lock);
1952 break;
1953 }
1954
1955 mux->tsmux->pat_changed = TRUE;
1956 mux->tsmux->si_changed = TRUE;
1957 tsmux_resend_pat (mux->tsmux);
1958 tsmux_resend_si (mux->tsmux);
1959
1960 /* output PMT for each program */
1961 for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1962 TsMuxProgram *program = (TsMuxProgram *) cur->data;
1963
1964 program->pmt_changed = TRUE;
1965 tsmux_resend_pmt (program);
1966 }
1967 g_mutex_unlock (&mux->lock);
1968
1969 res = TRUE;
1970 break;
1971 }
1972 case GST_EVENT_CUSTOM_DOWNSTREAM:
1973 {
1974 GstClockTime timestamp, stream_time, running_time;
1975 gboolean all_headers;
1976 guint count;
1977 const GstStructure *s;
1978
1979 s = gst_event_get_structure (event);
1980
1981 if (gst_structure_has_name (s, "scte-sit") && mux->scte35_pid != 0) {
1982
1983 /* When operating downstream of tsdemux, tsdemux will send out events
1984 * on all its source pads for each splice table it encounters. If we
1985 * are remuxing multiple streams it has demuxed, this means we could
1986 * unnecessarily repeat the same table multiple times, we avoid that
1987 * by deduplicating thanks to the event sequm
1988 */
1989 if (gst_event_get_seqnum (event) != mux->last_scte35_event_seqnum) {
1990 GstMpegtsSection *section;
1991
1992 gst_structure_get (s, "section", GST_TYPE_MPEGTS_SECTION, §ion,
1993 NULL);
1994 if (section) {
1995 guint64 mpeg_pts_offset = 0;
1996 GstStructure *rtime_map = NULL;
1997
1998 gst_structure_get (s, "running-time-map", GST_TYPE_STRUCTURE,
1999 &rtime_map, NULL);
2000 gst_structure_get_uint64 (s, "mpeg-pts-offset", &mpeg_pts_offset);
2001
2002 handle_scte35_section (mux, event, section, mpeg_pts_offset,
2003 rtime_map);
2004 if (rtime_map)
2005 gst_structure_free (rtime_map);
2006 mux->last_scte35_event_seqnum = gst_event_get_seqnum (event);
2007 } else {
2008 GST_WARNING_OBJECT (ts_pad,
2009 "Ignoring scte-sit event without a section");
2010 }
2011 } else {
2012 GST_DEBUG_OBJECT (ts_pad, "Ignoring duplicate scte-sit event");
2013 }
2014 res = TRUE;
2015 forward = FALSE;
2016 goto out;
2017 }
2018
2019 if (!gst_video_event_is_force_key_unit (event))
2020 goto out;
2021
2022 res = TRUE;
2023 forward = FALSE;
2024
2025 gst_video_event_parse_downstream_force_key_unit (event,
2026 ×tamp, &stream_time, &running_time, &all_headers, &count);
2027 GST_INFO_OBJECT (ts_pad, "have downstream force-key-unit event, "
2028 "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
2029 gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
2030
2031 if (mux->force_key_unit_event != NULL) {
2032 GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
2033 "as an upstream force key unit is already queued");
2034 goto out;
2035 }
2036
2037 if (!all_headers)
2038 goto out;
2039
2040 mux->pending_key_unit_ts = running_time;
2041 gst_event_replace (&mux->force_key_unit_event, event);
2042 break;
2043 }
2044 case GST_EVENT_TAG:{
2045 GstTagList *list;
2046 gchar *lang = NULL;
2047
2048 GST_DEBUG_OBJECT (mux, "received tag event");
2049 gst_event_parse_tag (event, &list);
2050
2051 /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
2052 if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
2053 const gchar *lang_code;
2054
2055 lang_code = gst_tag_get_language_code_iso_639_2B (lang);
2056 if (lang_code) {
2057 GST_DEBUG_OBJECT (ts_pad, "Setting language to '%s'", lang_code);
2058
2059 g_free (ts_pad->language);
2060 ts_pad->language = g_strdup (lang_code);
2061 } else {
2062 GST_WARNING_OBJECT (ts_pad, "Did not get language code for '%s'",
2063 lang);
2064 }
2065 g_free (lang);
2066 }
2067
2068 /* handled this, don't want collectpads to forward it downstream */
2069 res = TRUE;
2070 forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
2071 break;
2072 }
2073 case GST_EVENT_STREAM_START:{
2074 GstStreamFlags flags;
2075
2076 gst_event_parse_stream_flags (event, &flags);
2077
2078 /* Don't wait for data on sparse inputs like metadata streams */
2079 /*
2080 if ((flags & GST_STREAM_FLAG_SPARSE)) {
2081 GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
2082 gst_collect_pads_set_waiting (pads, data, FALSE);
2083 GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
2084 }
2085 */
2086 break;
2087 }
2088 default:
2089 break;
2090 }
2091
2092 out:
2093 if (!forward)
2094 gst_event_unref (event);
2095 else
2096 res = agg_class->sink_event (agg, agg_pad, event);
2097
2098 return res;
2099 }
2100
2101 static gboolean
gst_base_ts_mux_src_event(GstAggregator * agg,GstEvent * event)2102 gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
2103 {
2104 GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
2105 GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2106 gboolean res = TRUE, forward = TRUE;
2107
2108 switch (GST_EVENT_TYPE (event)) {
2109 case GST_EVENT_CUSTOM_UPSTREAM:
2110 {
2111 GstIterator *iter;
2112 GValue sinkpad_value = G_VALUE_INIT;
2113 GstClockTime running_time;
2114 gboolean all_headers, done = FALSE;
2115 guint count;
2116
2117 if (!gst_video_event_is_force_key_unit (event))
2118 break;
2119
2120 forward = FALSE;
2121
2122 gst_video_event_parse_upstream_force_key_unit (event,
2123 &running_time, &all_headers, &count);
2124
2125 GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
2126 "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
2127 gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
2128 all_headers, count);
2129
2130 if (!all_headers)
2131 break;
2132
2133 mux->pending_key_unit_ts = running_time;
2134 gst_event_replace (&mux->force_key_unit_event, event);
2135
2136 iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
2137
2138 while (!done) {
2139 switch (gst_iterator_next (iter, &sinkpad_value)) {
2140 case GST_ITERATOR_OK:{
2141 GstPad *sinkpad = g_value_get_object (&sinkpad_value);
2142 gboolean tmp;
2143
2144 GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
2145 tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
2146 GST_INFO_OBJECT (mux, "result %d", tmp);
2147 /* succeed if at least one pad succeeds */
2148 res |= tmp;
2149 break;
2150 }
2151 case GST_ITERATOR_DONE:
2152 done = TRUE;
2153 break;
2154 case GST_ITERATOR_RESYNC:
2155 gst_iterator_resync (iter);
2156 break;
2157 case GST_ITERATOR_ERROR:
2158 g_assert_not_reached ();
2159 break;
2160 }
2161 g_value_reset (&sinkpad_value);
2162 }
2163 g_value_unset (&sinkpad_value);
2164 gst_iterator_free (iter);
2165 break;
2166 }
2167 default:
2168 break;
2169 }
2170
2171 if (forward)
2172 res = agg_class->src_event (agg, event);
2173 else
2174 gst_event_unref (event);
2175
2176 return res;
2177 }
2178
2179 static GstBuffer *
gst_base_ts_mux_clip(GstAggregator * agg,GstAggregatorPad * agg_pad,GstBuffer * buf)2180 gst_base_ts_mux_clip (GstAggregator * agg,
2181 GstAggregatorPad * agg_pad, GstBuffer * buf)
2182 {
2183 GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (agg_pad);
2184 GstClockTime time;
2185 GstBuffer *ret;
2186
2187 ret = buf;
2188
2189 /* PTS */
2190 time = GST_BUFFER_PTS (buf);
2191
2192 /* invalid left alone and passed */
2193 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
2194 time =
2195 gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME, time);
2196 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
2197 GST_DEBUG_OBJECT (pad, "clipping buffer on pad outside segment");
2198 gst_buffer_unref (buf);
2199 ret = NULL;
2200 goto beach;
2201 } else {
2202 GST_LOG_OBJECT (pad, "buffer pts %" GST_TIME_FORMAT " -> %"
2203 GST_TIME_FORMAT " running time",
2204 GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
2205 buf = ret = gst_buffer_make_writable (buf);
2206 GST_BUFFER_PTS (ret) = time;
2207 }
2208 }
2209
2210 /* DTS */
2211 time = GST_BUFFER_DTS (buf);
2212
2213 /* invalid left alone and passed */
2214 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
2215 gint sign;
2216 gint64 dts;
2217
2218 sign = gst_segment_to_running_time_full (&agg_pad->segment, GST_FORMAT_TIME,
2219 time, &time);
2220
2221 if (sign > 0)
2222 dts = (gint64) time;
2223 else
2224 dts = -((gint64) time);
2225
2226 GST_LOG_OBJECT (pad, "buffer dts %" GST_TIME_FORMAT " -> %"
2227 GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
2228 GST_STIME_ARGS (dts));
2229
2230 if (GST_CLOCK_STIME_IS_VALID (pad->dts) && dts < pad->dts) {
2231 /* Ignore DTS going backward */
2232 GST_WARNING_OBJECT (pad, "ignoring DTS going backward");
2233 dts = pad->dts;
2234 }
2235
2236 ret = gst_buffer_make_writable (buf);
2237 if (sign > 0)
2238 GST_BUFFER_DTS (ret) = time;
2239 else
2240 GST_BUFFER_DTS (ret) = GST_CLOCK_TIME_NONE;
2241
2242 pad->dts = dts;
2243 } else {
2244 pad->dts = GST_CLOCK_STIME_NONE;
2245 }
2246
2247 beach:
2248 return ret;
2249 }
2250
2251 static GstBaseTsMuxPad *
gst_base_ts_mux_find_best_pad(GstAggregator * aggregator)2252 gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
2253 {
2254 GstBaseTsMuxPad *best = NULL;
2255 GstClockTime best_ts = GST_CLOCK_TIME_NONE;
2256 GList *l;
2257
2258 GST_OBJECT_LOCK (aggregator);
2259
2260 for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
2261 GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
2262 GstAggregatorPad *apad = GST_AGGREGATOR_PAD_CAST (tpad);
2263 GstBuffer *buffer;
2264
2265 buffer = gst_aggregator_pad_peek_buffer (apad);
2266 if (!buffer)
2267 continue;
2268 if (best_ts == GST_CLOCK_TIME_NONE) {
2269 best = tpad;
2270 best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
2271 } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
2272 GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
2273 if (t < best_ts) {
2274 best = tpad;
2275 best_ts = t;
2276 }
2277 }
2278 gst_buffer_unref (buffer);
2279 }
2280
2281 if (best)
2282 gst_object_ref (best);
2283
2284 GST_OBJECT_UNLOCK (aggregator);
2285
2286 GST_DEBUG_OBJECT (aggregator,
2287 "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
2288 GST_TIME_ARGS (best_ts), best);
2289
2290 return best;
2291 }
2292
2293 static gboolean
gst_base_ts_mux_are_all_pads_eos(GstBaseTsMux * mux)2294 gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
2295 {
2296 GList *l;
2297 gboolean ret = TRUE;
2298
2299 GST_OBJECT_LOCK (mux);
2300
2301 for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
2302 GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
2303
2304 if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) {
2305 ret = FALSE;
2306 break;
2307 }
2308 }
2309
2310 GST_OBJECT_UNLOCK (mux);
2311
2312 return ret;
2313 }
2314
2315
2316 static GstFlowReturn
gst_base_ts_mux_aggregate(GstAggregator * agg,gboolean timeout)2317 gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
2318 {
2319 GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2320 GstFlowReturn ret = GST_FLOW_OK;
2321 GstBaseTsMuxPad *best = gst_base_ts_mux_find_best_pad (agg);
2322 GstCaps *caps;
2323
2324 /* set caps on the srcpad if no caps were set yet */
2325 if (!(caps = gst_pad_get_current_caps (agg->srcpad))) {
2326 GstStructure *structure;
2327
2328 caps = gst_pad_get_pad_template_caps (GST_AGGREGATOR_SRC_PAD (mux));
2329 caps = gst_caps_make_writable (caps);
2330 structure = gst_caps_get_structure (caps, 0);
2331 gst_structure_set (structure, "packetsize", G_TYPE_INT, mux->packet_size,
2332 NULL);
2333
2334 gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
2335 }
2336 gst_caps_unref (caps);
2337
2338 if (best) {
2339 GstBuffer *buffer;
2340
2341 buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best));
2342 if (!buffer) {
2343 /* We might have gotten a flush event after we picked the pad */
2344 goto done;
2345 }
2346
2347 ret =
2348 gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
2349 GST_AGGREGATOR_PAD (best), buffer);
2350
2351 gst_object_unref (best);
2352
2353 if (ret != GST_FLOW_OK)
2354 goto done;
2355 }
2356
2357 if (gst_base_ts_mux_are_all_pads_eos (mux)) {
2358 GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
2359 /* drain some possibly cached data */
2360 if (klass->drain)
2361 klass->drain (mux);
2362 gst_base_ts_mux_push_packets (mux, TRUE);
2363
2364 ret = GST_FLOW_EOS;
2365 }
2366
2367 done:
2368 return ret;
2369 }
2370
2371 static gboolean
gst_base_ts_mux_start(GstAggregator * agg)2372 gst_base_ts_mux_start (GstAggregator * agg)
2373 {
2374 GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2375
2376 g_mutex_lock (&mux->lock);
2377 gst_base_ts_mux_reset (mux, TRUE);
2378 g_mutex_unlock (&mux->lock);
2379
2380 return TRUE;
2381 }
2382
2383 static gboolean
gst_base_ts_mux_stop(GstAggregator * agg)2384 gst_base_ts_mux_stop (GstAggregator * agg)
2385 {
2386 GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2387
2388 g_mutex_lock (&mux->lock);
2389 gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
2390 g_mutex_unlock (&mux->lock);
2391
2392 return TRUE;
2393 }
2394
2395 /* GObject implementation */
2396
2397 static void
gst_base_ts_mux_dispose(GObject * object)2398 gst_base_ts_mux_dispose (GObject * object)
2399 {
2400 GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2401
2402 g_mutex_lock (&mux->lock);
2403 gst_base_ts_mux_reset (mux, FALSE);
2404
2405 if (mux->out_adapter) {
2406 g_object_unref (mux->out_adapter);
2407 mux->out_adapter = NULL;
2408 }
2409 if (mux->prog_map) {
2410 gst_structure_free (mux->prog_map);
2411 mux->prog_map = NULL;
2412 }
2413 if (mux->programs) {
2414 g_hash_table_destroy (mux->programs);
2415 mux->programs = NULL;
2416 }
2417 g_mutex_unlock (&mux->lock);
2418 GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
2419 }
2420
2421 static void
gst_base_ts_mux_finalize(GObject * object)2422 gst_base_ts_mux_finalize (GObject * object)
2423 {
2424 GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2425
2426 g_mutex_clear (&mux->lock);
2427 GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
2428 }
2429
2430 static void
gst_base_ts_mux_constructed(GObject * object)2431 gst_base_ts_mux_constructed (GObject * object)
2432 {
2433 GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2434
2435 /* initial state */
2436 g_mutex_lock (&mux->lock);
2437 gst_base_ts_mux_reset (mux, TRUE);
2438 g_mutex_unlock (&mux->lock);
2439 }
2440
2441 static void
gst_base_ts_mux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)2442 gst_base_ts_mux_set_property (GObject * object, guint prop_id,
2443 const GValue * value, GParamSpec * pspec)
2444 {
2445 GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2446 GList *l;
2447
2448 switch (prop_id) {
2449 case PROP_PROG_MAP:
2450 {
2451 const GstStructure *s = gst_value_get_structure (value);
2452 if (mux->prog_map) {
2453 gst_structure_free (mux->prog_map);
2454 }
2455 if (s)
2456 mux->prog_map = gst_structure_copy (s);
2457 else
2458 mux->prog_map = NULL;
2459 break;
2460 }
2461 case PROP_PAT_INTERVAL:
2462 mux->pat_interval = g_value_get_uint (value);
2463 g_mutex_lock (&mux->lock);
2464 if (mux->tsmux)
2465 tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
2466 g_mutex_unlock (&mux->lock);
2467 break;
2468 case PROP_PMT_INTERVAL:
2469 mux->pmt_interval = g_value_get_uint (value);
2470 GST_OBJECT_LOCK (mux);
2471 for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
2472 GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
2473
2474 g_mutex_lock (&mux->lock);
2475 tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
2476 g_mutex_unlock (&mux->lock);
2477 }
2478 GST_OBJECT_UNLOCK (mux);
2479 break;
2480 case PROP_ALIGNMENT:
2481 mux->alignment = g_value_get_int (value);
2482 break;
2483 case PROP_SI_INTERVAL:
2484 mux->si_interval = g_value_get_uint (value);
2485 g_mutex_lock (&mux->lock);
2486 tsmux_set_si_interval (mux->tsmux, mux->si_interval);
2487 g_mutex_unlock (&mux->lock);
2488 break;
2489 case PROP_BITRATE:
2490 mux->bitrate = g_value_get_uint64 (value);
2491 g_mutex_lock (&mux->lock);
2492 if (mux->tsmux)
2493 tsmux_set_bitrate (mux->tsmux, mux->bitrate);
2494 g_mutex_unlock (&mux->lock);
2495 break;
2496 case PROP_PCR_INTERVAL:
2497 mux->pcr_interval = g_value_get_uint (value);
2498 g_mutex_lock (&mux->lock);
2499 if (mux->tsmux)
2500 tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
2501 g_mutex_unlock (&mux->lock);
2502 break;
2503 case PROP_SCTE_35_PID:
2504 mux->scte35_pid = g_value_get_uint (value);
2505 break;
2506 case PROP_SCTE_35_NULL_INTERVAL:
2507 mux->scte35_null_interval = g_value_get_uint (value);
2508 break;
2509 default:
2510 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2511 break;
2512 }
2513 }
2514
2515 static void
gst_base_ts_mux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)2516 gst_base_ts_mux_get_property (GObject * object, guint prop_id,
2517 GValue * value, GParamSpec * pspec)
2518 {
2519 GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2520
2521 switch (prop_id) {
2522 case PROP_PROG_MAP:
2523 gst_value_set_structure (value, mux->prog_map);
2524 break;
2525 case PROP_PAT_INTERVAL:
2526 g_value_set_uint (value, mux->pat_interval);
2527 break;
2528 case PROP_PMT_INTERVAL:
2529 g_value_set_uint (value, mux->pmt_interval);
2530 break;
2531 case PROP_ALIGNMENT:
2532 g_value_set_int (value, mux->alignment);
2533 break;
2534 case PROP_SI_INTERVAL:
2535 g_value_set_uint (value, mux->si_interval);
2536 break;
2537 case PROP_BITRATE:
2538 g_value_set_uint64 (value, mux->bitrate);
2539 break;
2540 case PROP_PCR_INTERVAL:
2541 g_value_set_uint (value, mux->pcr_interval);
2542 break;
2543 case PROP_SCTE_35_PID:
2544 g_value_set_uint (value, mux->scte35_pid);
2545 break;
2546 case PROP_SCTE_35_NULL_INTERVAL:
2547 g_value_set_uint (value, mux->scte35_null_interval);
2548 break;
2549 default:
2550 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2551 break;
2552 }
2553 }
2554
2555 /* Default vmethods implementation */
2556
2557 static TsMux *
gst_base_ts_mux_default_create_ts_mux(GstBaseTsMux * mux)2558 gst_base_ts_mux_default_create_ts_mux (GstBaseTsMux * mux)
2559 {
2560 TsMux *tsmux = tsmux_new ();
2561 tsmux_set_write_func (tsmux, new_packet_cb, mux);
2562 tsmux_set_alloc_func (tsmux, alloc_packet_cb, mux);
2563 tsmux_set_pat_interval (tsmux, mux->pat_interval);
2564 tsmux_set_si_interval (tsmux, mux->si_interval);
2565 tsmux_set_bitrate (tsmux, mux->bitrate);
2566 tsmux_set_pcr_interval (tsmux, mux->pcr_interval);
2567
2568 return tsmux;
2569 }
2570
2571 static void
gst_base_ts_mux_default_allocate_packet(GstBaseTsMux * mux,GstBuffer ** buffer)2572 gst_base_ts_mux_default_allocate_packet (GstBaseTsMux * mux,
2573 GstBuffer ** buffer)
2574 {
2575 GstBuffer *buf;
2576
2577 buf = gst_buffer_new_and_alloc (mux->packet_size);
2578
2579 *buffer = buf;
2580 }
2581
2582 static gboolean
gst_base_ts_mux_default_output_packet(GstBaseTsMux * mux,GstBuffer * buffer,gint64 new_pcr)2583 gst_base_ts_mux_default_output_packet (GstBaseTsMux * mux, GstBuffer * buffer,
2584 gint64 new_pcr)
2585 {
2586 gst_base_ts_mux_collect_packet (mux, buffer);
2587
2588 return TRUE;
2589 }
2590
2591 /* Subclass API */
2592
2593 void
gst_base_ts_mux_set_packet_size(GstBaseTsMux * mux,gsize size)2594 gst_base_ts_mux_set_packet_size (GstBaseTsMux * mux, gsize size)
2595 {
2596 mux->packet_size = size;
2597 }
2598
2599 void
gst_base_ts_mux_set_automatic_alignment(GstBaseTsMux * mux,gsize alignment)2600 gst_base_ts_mux_set_automatic_alignment (GstBaseTsMux * mux, gsize alignment)
2601 {
2602 mux->automatic_alignment = alignment;
2603 }
2604
2605 static void
gst_base_ts_mux_class_init(GstBaseTsMuxClass * klass)2606 gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
2607 {
2608 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
2609 GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass);
2610 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
2611
2612 GST_DEBUG_CATEGORY_INIT (gst_base_ts_mux_debug, "basetsmux", 0,
2613 "MPEG Transport Stream muxer");
2614
2615 gst_element_class_set_static_metadata (gstelement_class,
2616 "MPEG Transport Stream Muxer", "Codec/Muxer",
2617 "Multiplexes media streams into an MPEG Transport Stream",
2618 "Fluendo <contact@fluendo.com>");
2619
2620 gobject_class->set_property =
2621 GST_DEBUG_FUNCPTR (gst_base_ts_mux_set_property);
2622 gobject_class->get_property =
2623 GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property);
2624 gobject_class->dispose = gst_base_ts_mux_dispose;
2625 gobject_class->finalize = gst_base_ts_mux_finalize;
2626 gobject_class->constructed = gst_base_ts_mux_constructed;
2627
2628 gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
2629 gstelement_class->release_pad = gst_base_ts_mux_release_pad;
2630 gstelement_class->send_event = gst_base_ts_mux_send_event;
2631
2632 gstagg_class->negotiate = NULL;
2633 gstagg_class->aggregate = gst_base_ts_mux_aggregate;
2634 gstagg_class->clip = gst_base_ts_mux_clip;
2635 gstagg_class->sink_event = gst_base_ts_mux_sink_event;
2636 gstagg_class->src_event = gst_base_ts_mux_src_event;
2637 gstagg_class->start = gst_base_ts_mux_start;
2638 gstagg_class->stop = gst_base_ts_mux_stop;
2639
2640 klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux;
2641 klass->allocate_packet = gst_base_ts_mux_default_allocate_packet;
2642 klass->output_packet = gst_base_ts_mux_default_output_packet;
2643
2644 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PROG_MAP,
2645 g_param_spec_boxed ("prog-map", "Program map",
2646 "A GstStructure specifies the mapping from elementary streams to programs",
2647 GST_TYPE_STRUCTURE,
2648 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2649
2650 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PAT_INTERVAL,
2651 g_param_spec_uint ("pat-interval", "PAT interval",
2652 "Set the interval (in ticks of the 90kHz clock) for writing out the PAT table",
2653 1, G_MAXUINT, TSMUX_DEFAULT_PAT_INTERVAL,
2654 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2655
2656 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PMT_INTERVAL,
2657 g_param_spec_uint ("pmt-interval", "PMT interval",
2658 "Set the interval (in ticks of the 90kHz clock) for writing out the PMT table",
2659 1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL,
2660 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2661
2662 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_ALIGNMENT,
2663 g_param_spec_int ("alignment", "packet alignment",
2664 "Number of packets per buffer (padded with dummy packets on EOS) "
2665 "(-1 = auto, 0 = all available packets, 7 for UDP streaming)",
2666 -1, G_MAXINT, BASETSMUX_DEFAULT_ALIGNMENT,
2667 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2668
2669 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SI_INTERVAL,
2670 g_param_spec_uint ("si-interval", "SI interval",
2671 "Set the interval (in ticks of the 90kHz clock) for writing out the Service"
2672 "Information tables", 1, G_MAXUINT, TSMUX_DEFAULT_SI_INTERVAL,
2673 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2674
2675 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BITRATE,
2676 g_param_spec_uint64 ("bitrate", "Bitrate (in bits per second)",
2677 "Set the target bitrate, will insert null packets as padding "
2678 " to achieve multiplex-wide constant bitrate (0 means no padding)",
2679 0, G_MAXUINT64, TSMUX_DEFAULT_BITRATE,
2680 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2681
2682 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PCR_INTERVAL,
2683 g_param_spec_uint ("pcr-interval", "PCR interval",
2684 "Set the interval (in ticks of the 90kHz clock) for writing PCR",
2685 1, G_MAXUINT, TSMUX_DEFAULT_PCR_INTERVAL,
2686 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2687
2688 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SCTE_35_PID,
2689 g_param_spec_uint ("scte-35-pid", "SCTE-35 PID",
2690 "PID to use for inserting SCTE-35 packets (0: unused)",
2691 0, G_MAXUINT, DEFAULT_SCTE_35_PID,
2692 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2693
2694 g_object_class_install_property (G_OBJECT_CLASS (klass),
2695 PROP_SCTE_35_NULL_INTERVAL, g_param_spec_uint ("scte-35-null-interval",
2696 "SCTE-35 NULL packet interval",
2697 "Set the interval (in ticks of the 90kHz clock) for writing SCTE-35 NULL (heartbeat) packets."
2698 " (only valid if scte-35-pid is different from 0)", 1, G_MAXUINT,
2699 TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL,
2700 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2701
2702 gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
2703 &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
2704
2705 gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX_PAD, 0);
2706 }
2707
2708 static void
gst_base_ts_mux_init(GstBaseTsMux * mux)2709 gst_base_ts_mux_init (GstBaseTsMux * mux)
2710 {
2711 mux->out_adapter = gst_adapter_new ();
2712
2713 /* properties */
2714 mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL;
2715 mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL;
2716 mux->si_interval = TSMUX_DEFAULT_SI_INTERVAL;
2717 mux->pcr_interval = TSMUX_DEFAULT_PCR_INTERVAL;
2718 mux->prog_map = NULL;
2719 mux->alignment = BASETSMUX_DEFAULT_ALIGNMENT;
2720 mux->bitrate = TSMUX_DEFAULT_BITRATE;
2721 mux->scte35_pid = DEFAULT_SCTE_35_PID;
2722 mux->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
2723
2724 mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
2725 mux->automatic_alignment = 0;
2726
2727 g_mutex_init (&mux->lock);
2728 }
2729