• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2006, 2007, 2008, 2009, 2010 Fluendo S.A.
3  *  Authors: Jan Schmidt <jan@fluendo.com>
4  *           Kapil Agrawal <kapil@fluendo.com>
5  *           Julien Moutte <julien@fluendo.com>
6  *
7  * Copyright (C) 2011 Jan Schmidt <thaytan@noraisin.net>
8  *
9  * This library is licensed under 3 different licenses and you
10  * can choose to use it under the terms of any one of them. The
11  * three licenses are the MPL 1.1, the LGPL and the MIT license.
12  *
13  * MPL:
14  *
15  * The contents of this file are subject to the Mozilla Public License
16  * Version 1.1 (the "License"); you may not use this file except in
17  * compliance with the License. You may obtain a copy of the License at
18  * http://www.mozilla.org/MPL/.
19  *
20  * Software distributed under the License is distributed on an "AS IS"
21  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
22  * License for the specific language governing rights and limitations
23  * under the License.
24  *
25  * LGPL:
26  *
27  * This library is free software; you can redistribute it and/or
28  * modify it under the terms of the GNU Library General Public
29  * License as published by the Free Software Foundation; either
30  * version 2 of the License, or (at your option) any later version.
31  *
32  * This library is distributed in the hope that it will be useful,
33  * but WITHOUT ANY WARRANTY; without even the implied warranty of
34  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
35  * Library General Public License for more details.
36  *
37  * You should have received a copy of the GNU Library General Public
38  * License along with this library; if not, write to the
39  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
40  * Boston, MA 02110-1301, USA.
41  *
42  * MIT:
43  *
44  * Unless otherwise indicated, Source Code is licensed under MIT license.
45  * See further explanation attached in License Statement (distributed in the file
46  * LICENSE).
47  *
48  * Permission is hereby granted, free of charge, to any person obtaining a copy of
49  * this software and associated documentation files (the "Software"), to deal in
50  * the Software without restriction, including without limitation the rights to
51  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
52  * of the Software, and to permit persons to whom the Software is furnished to do
53  * so, subject to the following conditions:
54  *
55  * The above copyright notice and this permission notice shall be included in all
56  * copies or substantial portions of the Software.
57  *
58  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
59  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
60  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
61  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
62  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
63  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
64  * SOFTWARE.
65  *
66  * SPDX-License-Identifier: MPL-1.1 OR MIT OR LGPL-2.0-or-later
67  */
68 #ifdef HAVE_CONFIG_H
69 #include "config.h"
70 #endif
71 
72 #include <stdio.h>
73 #include <string.h>
74 
75 #include <gst/tag/tag.h>
76 #include <gst/video/video.h>
77 #include <gst/mpegts/mpegts.h>
78 #include <gst/pbutils/pbutils.h>
79 #include <gst/videoparsers/gstjpeg2000parse.h>
80 #include <gst/video/video-color.h>
81 
82 #include "gstbasetsmux.h"
83 #include "gstbasetsmuxaac.h"
84 #include "gstbasetsmuxttxt.h"
85 #include "gstbasetsmuxopus.h"
86 #include "gstbasetsmuxjpeg2000.h"
87 
88 GST_DEBUG_CATEGORY (gst_base_ts_mux_debug);
89 #define GST_CAT_DEFAULT gst_base_ts_mux_debug
90 
91 /* GstBaseTsMuxPad */
92 
93 G_DEFINE_TYPE (GstBaseTsMuxPad, gst_base_ts_mux_pad, GST_TYPE_AGGREGATOR_PAD);
94 
95 /* Internals */
96 
97 static void
gst_base_ts_mux_pad_reset(GstBaseTsMuxPad * pad)98 gst_base_ts_mux_pad_reset (GstBaseTsMuxPad * pad)
99 {
100   pad->dts = GST_CLOCK_STIME_NONE;
101   pad->prog_id = -1;
102 
103   if (pad->free_func)
104     pad->free_func (pad->prepare_data);
105   pad->prepare_data = NULL;
106   pad->prepare_func = NULL;
107   pad->free_func = NULL;
108 
109   if (pad->codec_data)
110     gst_buffer_replace (&pad->codec_data, NULL);
111 
112   /* reference owned elsewhere */
113   pad->stream = NULL;
114   pad->prog = NULL;
115 
116   if (pad->language) {
117     g_free (pad->language);
118     pad->language = NULL;
119   }
120 }
121 
122 /* GstAggregatorPad implementation */
123 
124 static GstFlowReturn
gst_base_ts_mux_pad_flush(GstAggregatorPad * agg_pad,GstAggregator * agg)125 gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
126 {
127   GList *cur;
128   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
129 
130   /* Send initial segments again after a flush-stop, and also resend the
131    * header sections */
132   g_mutex_lock (&mux->lock);
133   mux->first = TRUE;
134 
135   /* output PAT, SI tables */
136   tsmux_resend_pat (mux->tsmux);
137   tsmux_resend_si (mux->tsmux);
138 
139   /* output PMT for each program */
140   for (cur = mux->tsmux->programs; cur; cur = cur->next) {
141     TsMuxProgram *program = (TsMuxProgram *) cur->data;
142 
143     tsmux_resend_pmt (program);
144   }
145   g_mutex_unlock (&mux->lock);
146 
147   return GST_FLOW_OK;
148 }
149 
150 /* GObject implementation */
151 
152 static void
gst_base_ts_mux_pad_dispose(GObject * obj)153 gst_base_ts_mux_pad_dispose (GObject * obj)
154 {
155   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (obj);
156 
157   gst_base_ts_mux_pad_reset (ts_pad);
158 
159   G_OBJECT_CLASS (gst_base_ts_mux_pad_parent_class)->dispose (obj);
160 }
161 
162 static void
gst_base_ts_mux_pad_class_init(GstBaseTsMuxPadClass * klass)163 gst_base_ts_mux_pad_class_init (GstBaseTsMuxPadClass * klass)
164 {
165   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
166   GstAggregatorPadClass *gstaggpad_class = GST_AGGREGATOR_PAD_CLASS (klass);
167 
168   gobject_class->dispose = gst_base_ts_mux_pad_dispose;
169   gstaggpad_class->flush = gst_base_ts_mux_pad_flush;
170 
171   gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX, 0);
172 }
173 
174 static void
gst_base_ts_mux_pad_init(GstBaseTsMuxPad * vaggpad)175 gst_base_ts_mux_pad_init (GstBaseTsMuxPad * vaggpad)
176 {
177 }
178 
179 /* GstBaseTsMux */
180 
181 enum
182 {
183   PROP_0,
184   PROP_PROG_MAP,
185   PROP_PAT_INTERVAL,
186   PROP_PMT_INTERVAL,
187   PROP_ALIGNMENT,
188   PROP_SI_INTERVAL,
189   PROP_BITRATE,
190   PROP_PCR_INTERVAL,
191   PROP_SCTE_35_PID,
192   PROP_SCTE_35_NULL_INTERVAL
193 };
194 
195 #define DEFAULT_SCTE_35_PID 0
196 
197 #define BASETSMUX_DEFAULT_ALIGNMENT    -1
198 
199 #define CLOCK_BASE 9LL
200 #define CLOCK_FREQ (CLOCK_BASE * 10000) /* 90 kHz PTS clock */
201 #define CLOCK_FREQ_SCR (CLOCK_FREQ * 300)       /* 27 MHz SCR clock */
202 #define TS_MUX_CLOCK_BASE (TSMUX_CLOCK_FREQ * 10 * 360)
203 
204 #define GSTTIME_TO_MPEGTIME(time) \
205     (((time) > 0 ? (gint64) 1 : (gint64) -1) * \
206     (gint64) gst_util_uint64_scale (ABS(time), CLOCK_BASE, GST_MSECOND/10))
207 /* 27 MHz SCR conversions: */
208 #define MPEG_SYS_TIME_TO_GSTTIME(time) (gst_util_uint64_scale ((time), \
209                         GST_USECOND, CLOCK_FREQ_SCR / 1000000))
210 #define GSTTIME_TO_MPEG_SYS_TIME(time) (gst_util_uint64_scale ((time), \
211                         CLOCK_FREQ_SCR / 1000000, GST_USECOND))
212 
213 #define DEFAULT_PROG_ID	0
214 
215 static GstStaticPadTemplate gst_base_ts_mux_src_factory =
216 GST_STATIC_PAD_TEMPLATE ("src",
217     GST_PAD_SRC,
218     GST_PAD_ALWAYS,
219     GST_STATIC_CAPS ("video/mpegts, "
220         "systemstream = (boolean) true, " "packetsize = (int) { 188, 192} ")
221     );
222 
223 typedef struct
224 {
225   GstMapInfo map_info;
226   GstBuffer *buffer;
227 } StreamData;
228 
229 G_DEFINE_TYPE_WITH_CODE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_AGGREGATOR,
230     gst_mpegts_initialize ());
231 
232 /* Internals */
233 
234 /* Takes over the ref on the buffer */
235 static StreamData *
stream_data_new(GstBuffer * buffer)236 stream_data_new (GstBuffer * buffer)
237 {
238   StreamData *res = g_new (StreamData, 1);
239   res->buffer = buffer;
240   gst_buffer_map (buffer, &(res->map_info), GST_MAP_READ);
241 
242   return res;
243 }
244 
245 static void
stream_data_free(StreamData * data)246 stream_data_free (StreamData * data)
247 {
248   if (data) {
249     gst_buffer_unmap (data->buffer, &data->map_info);
250     gst_buffer_unref (data->buffer);
251     g_free (data);
252   }
253 }
254 
255 #define parent_class gst_base_ts_mux_parent_class
256 
257 static void
gst_base_ts_mux_set_header_on_caps(GstBaseTsMux * mux)258 gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux)
259 {
260   GstBuffer *buf;
261   GstStructure *structure;
262   GValue array = { 0 };
263   GValue value = { 0 };
264   GstCaps *caps;
265 
266   caps = gst_pad_get_pad_template_caps (GST_AGGREGATOR_SRC_PAD (mux));
267 
268   caps = gst_caps_make_writable (caps);
269   structure = gst_caps_get_structure (caps, 0);
270 
271   gst_structure_set (structure, "packetsize", G_TYPE_INT, mux->packet_size,
272       NULL);
273 
274   g_value_init (&array, GST_TYPE_ARRAY);
275 
276   GST_LOG_OBJECT (mux, "setting %u packets into streamheader",
277       g_queue_get_length (&mux->streamheader));
278 
279   while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader)))) {
280     g_value_init (&value, GST_TYPE_BUFFER);
281     gst_value_take_buffer (&value, buf);
282     gst_value_array_append_value (&array, &value);
283     g_value_unset (&value);
284   }
285 
286   gst_structure_set_value (structure, "streamheader", &array);
287   gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
288   g_value_unset (&array);
289   gst_caps_unref (caps);
290 }
291 
292 static gboolean
steal_si_section(GstMpegtsSectionType * type,TsMuxSection * section,TsMux * mux)293 steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section,
294     TsMux * mux)
295 {
296   g_hash_table_insert (mux->si_sections, type, section);
297 
298   return TRUE;
299 }
300 
301 /* Must be called with mux->lock held */
302 static void
gst_base_ts_mux_reset(GstBaseTsMux * mux,gboolean alloc)303 gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
304 {
305   GstBuffer *buf;
306   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
307   GHashTable *si_sections = NULL;
308   GList *l;
309 
310   mux->first = TRUE;
311   mux->last_flow_ret = GST_FLOW_OK;
312   mux->last_ts = GST_CLOCK_TIME_NONE;
313   mux->is_delta = TRUE;
314   mux->is_header = FALSE;
315 
316   mux->streamheader_sent = FALSE;
317   mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
318   gst_event_replace (&mux->force_key_unit_event, NULL);
319 
320   if (mux->out_adapter)
321     gst_adapter_clear (mux->out_adapter);
322   mux->output_ts_offset = GST_CLOCK_STIME_NONE;
323 
324   if (mux->tsmux) {
325     if (mux->tsmux->si_sections)
326       si_sections = g_hash_table_ref (mux->tsmux->si_sections);
327 
328     tsmux_free (mux->tsmux);
329     mux->tsmux = NULL;
330   }
331 
332   if (mux->programs) {
333     g_hash_table_destroy (mux->programs);
334   }
335   mux->programs = g_hash_table_new (g_direct_hash, g_direct_equal);
336 
337   while ((buf = GST_BUFFER (g_queue_pop_head (&mux->streamheader))))
338     gst_buffer_unref (buf);
339 
340   gst_event_replace (&mux->force_key_unit_event, NULL);
341   gst_buffer_replace (&mux->out_buffer, NULL);
342 
343   GST_OBJECT_LOCK (mux);
344 
345   for (l = GST_ELEMENT (mux)->sinkpads; l; l = l->next) {
346     gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (l->data));
347   }
348 
349   GST_OBJECT_UNLOCK (mux);
350 
351   if (alloc) {
352     g_assert (klass->create_ts_mux);
353 
354     mux->tsmux = klass->create_ts_mux (mux);
355 
356     /* Preserve user-specified sections across resets */
357     if (si_sections)
358       g_hash_table_foreach_steal (si_sections, (GHRFunc) steal_si_section,
359           mux->tsmux);
360   }
361 
362   if (si_sections)
363     g_hash_table_unref (si_sections);
364 
365   mux->last_scte35_event_seqnum = GST_SEQNUM_INVALID;
366 
367   if (klass->reset)
368     klass->reset (mux);
369 }
370 
371 static void
release_buffer_cb(guint8 * data,void * user_data)372 release_buffer_cb (guint8 * data, void *user_data)
373 {
374   stream_data_free ((StreamData *) user_data);
375 }
376 
377 /* Must be called with mux->lock held */
378 static GstFlowReturn
gst_base_ts_mux_create_or_update_stream(GstBaseTsMux * mux,GstBaseTsMuxPad * ts_pad,GstCaps * caps)379 gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux,
380     GstBaseTsMuxPad * ts_pad, GstCaps * caps)
381 {
382   GstStructure *s;
383   guint st = TSMUX_ST_RESERVED;
384   const gchar *mt;
385   const GValue *value = NULL;
386   GstBuffer *codec_data = NULL;
387   guint8 opus_channel_config_code = 0;
388   guint16 profile = GST_JPEG2000_PARSE_PROFILE_NONE;
389   guint8 main_level = 0;
390   guint32 max_rate = 0;
391   guint8 color_spec = 0;
392   const gchar *stream_format = NULL;
393   const char *interlace_mode = NULL;
394   gchar *pmt_name;
395 
396   GST_DEBUG_OBJECT (ts_pad,
397       "%s stream with PID 0x%04x for caps %" GST_PTR_FORMAT,
398       ts_pad->stream ? "Recreating" : "Creating", ts_pad->pid, caps);
399 
400   s = gst_caps_get_structure (caps, 0);
401 
402   mt = gst_structure_get_name (s);
403   value = gst_structure_get_value (s, "codec_data");
404   if (value != NULL)
405     codec_data = gst_value_get_buffer (value);
406 
407   g_clear_pointer (&ts_pad->codec_data, gst_buffer_unref);
408   ts_pad->prepare_func = NULL;
409 
410   stream_format = gst_structure_get_string (s, "stream-format");
411 
412   if (strcmp (mt, "video/x-dirac") == 0) {
413     st = TSMUX_ST_VIDEO_DIRAC;
414   } else if (strcmp (mt, "audio/x-ac3") == 0) {
415     st = TSMUX_ST_PS_AUDIO_AC3;
416   } else if (strcmp (mt, "audio/x-dts") == 0) {
417     st = TSMUX_ST_PS_AUDIO_DTS;
418   } else if (strcmp (mt, "audio/x-lpcm") == 0) {
419     st = TSMUX_ST_PS_AUDIO_LPCM;
420   } else if (strcmp (mt, "video/x-h264") == 0) {
421     st = TSMUX_ST_VIDEO_H264;
422   } else if (strcmp (mt, "video/x-h265") == 0) {
423     st = TSMUX_ST_VIDEO_HEVC;
424   } else if (strcmp (mt, "audio/mpeg") == 0) {
425     gint mpegversion;
426 
427     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
428       GST_ERROR_OBJECT (ts_pad, "caps missing mpegversion");
429       goto not_negotiated;
430     }
431 
432     switch (mpegversion) {
433       case 1:{
434         int mpegaudioversion = 1;       /* Assume mpegaudioversion=1 for backwards compatibility */
435         (void) gst_structure_get_int (s, "mpegaudioversion", &mpegaudioversion);
436 
437         if (mpegaudioversion == 1)
438           st = TSMUX_ST_AUDIO_MPEG1;
439         else
440           st = TSMUX_ST_AUDIO_MPEG2;
441         break;
442       }
443       case 2:{
444         /* mpegversion=2 in GStreamer refers to MPEG-2 Part 7 audio,  */
445 
446         st = TSMUX_ST_AUDIO_AAC;
447 
448         /* Check the stream format. If raw, make dummy internal codec data from the caps */
449         if (g_strcmp0 (stream_format, "raw") == 0) {
450           ts_pad->codec_data =
451               gst_base_ts_mux_aac_mpeg2_make_codec_data (mux, caps);
452           ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg2;
453           if (ts_pad->codec_data == NULL) {
454             GST_ERROR_OBJECT (mux, "Invalid or incomplete caps for MPEG-2 AAC");
455             goto not_negotiated;
456           }
457         }
458         break;
459       }
460       case 4:
461       {
462         st = TSMUX_ST_AUDIO_AAC;
463 
464         /* Check the stream format. We need codec_data with RAW streams and mpegversion=4 */
465         if (g_strcmp0 (stream_format, "raw") == 0) {
466           if (codec_data) {
467             GST_DEBUG_OBJECT (ts_pad,
468                 "we have additional codec data (%" G_GSIZE_FORMAT " bytes)",
469                 gst_buffer_get_size (codec_data));
470             ts_pad->codec_data = gst_buffer_ref (codec_data);
471             ts_pad->prepare_func = gst_base_ts_mux_prepare_aac_mpeg4;
472           } else {
473             ts_pad->codec_data = NULL;
474             GST_ERROR_OBJECT (mux, "Need codec_data for raw MPEG-4 AAC");
475             goto not_negotiated;
476           }
477         } else if (codec_data) {
478           ts_pad->codec_data = gst_buffer_ref (codec_data);
479         } else {
480           ts_pad->codec_data = NULL;
481         }
482         break;
483       }
484       default:
485         GST_WARNING_OBJECT (ts_pad, "unsupported mpegversion %d", mpegversion);
486         goto not_negotiated;
487     }
488   } else if (strcmp (mt, "video/mpeg") == 0) {
489     gint mpegversion;
490 
491     if (!gst_structure_get_int (s, "mpegversion", &mpegversion)) {
492       GST_ERROR_OBJECT (ts_pad, "caps missing mpegversion");
493       goto not_negotiated;
494     }
495 
496     switch (mpegversion) {
497       case 1:
498         st = TSMUX_ST_VIDEO_MPEG1;
499         break;
500       case 2:
501         st = TSMUX_ST_VIDEO_MPEG2;
502         break;
503       case 4:
504         st = TSMUX_ST_VIDEO_MPEG4;
505         break;
506       default:
507         GST_WARNING_OBJECT (ts_pad, "unsupported mpegversion %d", mpegversion);
508         goto not_negotiated;
509     }
510   } else if (strcmp (mt, "subpicture/x-dvb") == 0) {
511     st = TSMUX_ST_PS_DVB_SUBPICTURE;
512   } else if (strcmp (mt, "application/x-teletext") == 0) {
513     st = TSMUX_ST_PS_TELETEXT;
514     /* needs a particularly sized layout */
515     ts_pad->prepare_func = gst_base_ts_mux_prepare_teletext;
516   } else if (strcmp (mt, "audio/x-opus") == 0) {
517     guint8 channels, mapping_family, stream_count, coupled_count;
518     guint8 channel_mapping[256];
519 
520     if (!gst_codec_utils_opus_parse_caps (caps, NULL, &channels,
521             &mapping_family, &stream_count, &coupled_count, channel_mapping)) {
522       GST_ERROR_OBJECT (ts_pad, "Incomplete Opus caps");
523       goto not_negotiated;
524     }
525 
526     if (channels <= 2 && mapping_family == 0) {
527       opus_channel_config_code = channels;
528     } else if (channels == 2 && mapping_family == 255 && stream_count == 1
529         && coupled_count == 1) {
530       /* Dual mono */
531       opus_channel_config_code = 0;
532     } else if (channels >= 2 && channels <= 8 && mapping_family == 1) {
533       static const guint8 coupled_stream_counts[9] = {
534         1, 0, 1, 1, 2, 2, 2, 3, 3
535       };
536       static const guint8 channel_map_a[8][8] = {
537         {0},
538         {0, 1},
539         {0, 2, 1},
540         {0, 1, 2, 3},
541         {0, 4, 1, 2, 3},
542         {0, 4, 1, 2, 3, 5},
543         {0, 4, 1, 2, 3, 5, 6},
544         {0, 6, 1, 2, 3, 4, 5, 7},
545       };
546       static const guint8 channel_map_b[8][8] = {
547         {0},
548         {0, 1},
549         {0, 1, 2},
550         {0, 1, 2, 3},
551         {0, 1, 2, 3, 4},
552         {0, 1, 2, 3, 4, 5},
553         {0, 1, 2, 3, 4, 5, 6},
554         {0, 1, 2, 3, 4, 5, 6, 7},
555       };
556 
557       /* Vorbis mapping */
558       if (stream_count == channels - coupled_stream_counts[channels] &&
559           coupled_count == coupled_stream_counts[channels] &&
560           memcmp (channel_mapping, channel_map_a[channels - 1],
561               channels) == 0) {
562         opus_channel_config_code = channels;
563       } else if (stream_count == channels - coupled_stream_counts[channels] &&
564           coupled_count == coupled_stream_counts[channels] &&
565           memcmp (channel_mapping, channel_map_b[channels - 1],
566               channels) == 0) {
567         opus_channel_config_code = channels | 0x80;
568       } else {
569         GST_FIXME_OBJECT (ts_pad, "Opus channel mapping not handled");
570         goto not_negotiated;
571       }
572     }
573 
574     st = TSMUX_ST_PS_OPUS;
575     ts_pad->prepare_func = gst_base_ts_mux_prepare_opus;
576   } else if (strcmp (mt, "meta/x-klv") == 0) {
577     st = TSMUX_ST_PS_KLV;
578   } else if (strcmp (mt, "image/x-jpc") == 0) {
579     /*
580      * See this document for more details on standard:
581      *
582      * https://www.itu.int/rec/T-REC-H.222.0-201206-S/en
583      *  Annex S describes J2K details
584      *  Page 104 of this document describes J2k video descriptor
585      */
586 
587     const GValue *vProfile = gst_structure_get_value (s, "profile");
588     const GValue *vMainlevel = gst_structure_get_value (s, "main-level");
589     const GValue *vFramerate = gst_structure_get_value (s, "framerate");
590     const GValue *vColorimetry = gst_structure_get_value (s, "colorimetry");
591     j2k_private_data *private_data;
592 
593     /* for now, we relax the condition that profile must exist and equal
594      * GST_JPEG2000_PARSE_PROFILE_BC_SINGLE */
595     if (vProfile) {
596       profile = g_value_get_int (vProfile);
597       if (profile != GST_JPEG2000_PARSE_PROFILE_BC_SINGLE) {
598         GST_LOG_OBJECT (ts_pad, "Invalid JPEG 2000 profile %d", profile);
599         /* goto not_negotiated; */
600       }
601     }
602     /* for now, we will relax the condition that the main level must be present */
603     if (vMainlevel) {
604       main_level = g_value_get_uint (vMainlevel);
605       if (main_level > 11) {
606         GST_ERROR_OBJECT (ts_pad, "Invalid main level %d", main_level);
607         goto not_negotiated;
608       }
609       if (main_level >= 6) {
610         max_rate = 2 ^ (main_level - 6) * 1600 * 1000000;
611       } else {
612         switch (main_level) {
613           case 0:
614           case 1:
615           case 2:
616           case 3:
617             max_rate = 200 * 1000000;
618             break;
619           case 4:
620             max_rate = 400 * 1000000;
621             break;
622           case 5:
623             max_rate = 800 * 1000000;
624             break;
625           default:
626             break;
627         }
628       }
629     } else {
630       /* GST_ERROR_OBJECT (ts_pad, "Missing main level");
631        * goto not_negotiated; */
632     }
633 
634     /* We always mux video in J2K-over-MPEG-TS non-interlaced mode */
635     private_data = g_new0 (j2k_private_data, 1);
636     private_data->interlace = FALSE;
637     private_data->den = 0;
638     private_data->num = 0;
639     private_data->max_bitrate = max_rate;
640     private_data->color_spec = 1;
641     /* these two fields are not used, since we always mux as non-interlaced */
642     private_data->Fic = 1;
643     private_data->Fio = 0;
644 
645     /* Get Framerate */
646     if (vFramerate != NULL) {
647       /* Data for ELSM header */
648       private_data->num = gst_value_get_fraction_numerator (vFramerate);
649       private_data->den = gst_value_get_fraction_denominator (vFramerate);
650     }
651     /* Get Colorimetry */
652     if (vColorimetry) {
653       const char *colorimetry = g_value_get_string (vColorimetry);
654       color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_SRGB;  /* RGB as default */
655       if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT601)) {
656         color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC601;
657       } else {
658         if (g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_BT709)
659             || g_str_equal (colorimetry, GST_VIDEO_COLORIMETRY_SMPTE240M)) {
660           color_spec = GST_MPEGTS_JPEG2000_COLORSPEC_REC709;
661         }
662       }
663       private_data->color_spec = color_spec;
664     } else {
665       GST_ERROR_OBJECT (ts_pad, "Colorimetry not present in caps");
666       g_free (private_data);
667       goto not_negotiated;
668     }
669     st = TSMUX_ST_VIDEO_JP2K;
670     ts_pad->prepare_func = gst_base_ts_mux_prepare_jpeg2000;
671     ts_pad->prepare_data = private_data;
672     ts_pad->free_func = gst_base_ts_mux_free_jpeg2000;
673   } else {
674     GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
675 
676     if (klass->handle_media_type) {
677       st = klass->handle_media_type (mux, mt, ts_pad);
678     }
679   }
680 
681   if (st == TSMUX_ST_RESERVED) {
682     GST_ERROR_OBJECT (ts_pad, "Failed to determine stream type");
683     goto error;
684   }
685 
686   if (ts_pad->stream && st != ts_pad->stream->stream_type) {
687     GST_ELEMENT_ERROR (mux, STREAM, MUX,
688         ("Stream type change from %02x to %02x not supported",
689             ts_pad->stream->stream_type, st), NULL);
690     goto error;
691   }
692 
693   if (ts_pad->stream == NULL) {
694     ts_pad->stream =
695         tsmux_create_stream (mux->tsmux, st, ts_pad->pid, ts_pad->language);
696     if (ts_pad->stream == NULL)
697       goto error;
698   }
699 
700   pmt_name = g_strdup_printf ("PMT_%d", ts_pad->pid);
701   if (mux->prog_map && gst_structure_has_field (mux->prog_map, pmt_name)) {
702     gst_structure_get_int (mux->prog_map, pmt_name, &ts_pad->stream->pmt_index);
703   }
704   g_free (pmt_name);
705 
706   interlace_mode = gst_structure_get_string (s, "interlace-mode");
707   gst_structure_get_int (s, "rate", &ts_pad->stream->audio_sampling);
708   gst_structure_get_int (s, "channels", &ts_pad->stream->audio_channels);
709   gst_structure_get_int (s, "bitrate", &ts_pad->stream->audio_bitrate);
710 
711   /* frame rate */
712   gst_structure_get_fraction (s, "framerate", &ts_pad->stream->num,
713       &ts_pad->stream->den);
714 
715   /* Interlace mode */
716   ts_pad->stream->interlace_mode = FALSE;
717   if (interlace_mode) {
718     ts_pad->stream->interlace_mode =
719         g_str_equal (interlace_mode, "interleaved");
720   }
721 
722   /* Width and Height */
723   gst_structure_get_int (s, "width", &ts_pad->stream->horizontal_size);
724   gst_structure_get_int (s, "height", &ts_pad->stream->vertical_size);
725 
726   ts_pad->stream->color_spec = color_spec;
727   ts_pad->stream->max_bitrate = max_rate;
728   ts_pad->stream->profile_and_level = profile | main_level;
729 
730   ts_pad->stream->opus_channel_config_code = opus_channel_config_code;
731 
732   tsmux_stream_set_buffer_release_func (ts_pad->stream, release_buffer_cb);
733 
734   return GST_FLOW_OK;
735 
736   /* ERRORS */
737 not_negotiated:
738   return GST_FLOW_NOT_NEGOTIATED;
739 
740 error:
741   return GST_FLOW_ERROR;
742 }
743 
744 static gboolean
is_valid_pmt_pid(guint16 pmt_pid)745 is_valid_pmt_pid (guint16 pmt_pid)
746 {
747   if (pmt_pid < 0x0010 || pmt_pid > 0x1ffe)
748     return FALSE;
749   return TRUE;
750 }
751 
752 /* Must be called with mux->lock held */
753 static GstFlowReturn
gst_base_ts_mux_create_stream(GstBaseTsMux * mux,GstBaseTsMuxPad * ts_pad)754 gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
755 {
756   GstCaps *caps = gst_pad_get_current_caps (GST_PAD (ts_pad));
757   GstFlowReturn ret;
758 
759   if (caps == NULL) {
760     GST_DEBUG_OBJECT (ts_pad, "Sink pad caps were not set before pushing");
761     return GST_FLOW_NOT_NEGOTIATED;
762   }
763 
764   ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
765   gst_caps_unref (caps);
766 
767   if (ret == GST_FLOW_OK) {
768     tsmux_program_add_stream (ts_pad->prog, ts_pad->stream);
769   }
770 
771   return ret;
772 }
773 
774 /* Must be called with mux->lock held */
775 static GstFlowReturn
gst_base_ts_mux_create_pad_stream(GstBaseTsMux * mux,GstPad * pad)776 gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
777 {
778   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
779   gchar *name = NULL;
780   gchar *prop_name;
781   GstFlowReturn ret = GST_FLOW_OK;
782 
783   if (ts_pad->prog_id == -1) {
784     name = GST_PAD_NAME (pad);
785     if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) {
786       gint idx;
787       gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
788       if (!ret) {
789         GST_ELEMENT_ERROR (mux, STREAM, MUX,
790             ("Reading program map failed. Assuming default"), (NULL));
791         idx = DEFAULT_PROG_ID;
792       }
793       if (idx < 0) {
794         GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
795             "than zero; DEFAULT_PROGRAM = %d is used instead",
796             idx, name, DEFAULT_PROG_ID);
797         idx = DEFAULT_PROG_ID;
798       }
799       ts_pad->prog_id = idx;
800     } else {
801       ts_pad->prog_id = DEFAULT_PROG_ID;
802     }
803   }
804 
805   ts_pad->prog =
806       (TsMuxProgram *) g_hash_table_lookup (mux->programs,
807       GINT_TO_POINTER (ts_pad->prog_id));
808   if (ts_pad->prog == NULL) {
809     ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
810     if (ts_pad->prog == NULL)
811       goto no_program;
812     tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
813     tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
814     tsmux_program_set_scte35_interval (ts_pad->prog, mux->scte35_null_interval);
815     g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
816         ts_pad->prog);
817 
818     /* Check for user-specified PMT PID */
819     prop_name = g_strdup_printf ("PMT_%d", ts_pad->prog->pgm_number);
820     if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
821       guint pmt_pid;
822 
823       if (gst_structure_get_uint (mux->prog_map, prop_name, &pmt_pid)) {
824         if (is_valid_pmt_pid (pmt_pid)) {
825           GST_DEBUG_OBJECT (mux, "User specified pid=%u as PMT for "
826               "program (prog_id = %d)", pmt_pid, ts_pad->prog->pgm_number);
827           tsmux_program_set_pmt_pid (ts_pad->prog, pmt_pid);
828         } else {
829           GST_ELEMENT_WARNING (mux, LIBRARY, SETTINGS,
830               ("User specified PMT pid %u for program %d is not valid.",
831                   pmt_pid, ts_pad->prog->pgm_number), (NULL));
832         }
833       }
834     }
835     g_free (prop_name);
836   }
837 
838   if (ts_pad->stream == NULL) {
839     ret = gst_base_ts_mux_create_stream (mux, ts_pad);
840     if (ret != GST_FLOW_OK)
841       goto no_stream;
842   }
843 
844   if (ts_pad->prog->pcr_stream == NULL) {
845     /* Take the first stream of the program for the PCR */
846     GST_DEBUG_OBJECT (ts_pad,
847         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
848         ts_pad->pid, ts_pad->prog_id);
849 
850     tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
851   }
852 
853   /* Check for user-specified PCR PID */
854   prop_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
855   if (mux->prog_map && gst_structure_has_field (mux->prog_map, prop_name)) {
856     const gchar *sink_name =
857         gst_structure_get_string (mux->prog_map, prop_name);
858 
859     if (!g_strcmp0 (name, sink_name)) {
860       GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
861           "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
862       tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
863     }
864   }
865   g_free (prop_name);
866 
867   return ret;
868 
869   /* ERRORS */
870 no_program:
871   {
872     GST_ELEMENT_ERROR (mux, STREAM, MUX,
873         ("Could not create new program"), (NULL));
874     return GST_FLOW_ERROR;
875   }
876 no_stream:
877   {
878     GST_ELEMENT_ERROR (mux, STREAM, MUX,
879         ("Could not create handler for stream"), (NULL));
880     return ret;
881   }
882 }
883 
884 /* Must be called with mux->lock held */
885 static gboolean
gst_base_ts_mux_create_pad_stream_func(GstElement * element,GstPad * pad,gpointer user_data)886 gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
887     gpointer user_data)
888 {
889   GstFlowReturn *ret = user_data;
890 
891   *ret = gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad);
892 
893   return *ret == GST_FLOW_OK;
894 }
895 
896 /* Must be called with mux->lock held */
897 static GstFlowReturn
gst_base_ts_mux_create_streams(GstBaseTsMux * mux)898 gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
899 {
900   GstFlowReturn ret = GST_FLOW_OK;
901 
902   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (mux),
903       gst_base_ts_mux_create_pad_stream_func, &ret);
904 
905   return ret;
906 }
907 
908 static void
new_packet_common_init(GstBaseTsMux * mux,GstBuffer * buf,guint8 * data,guint len)909 new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
910     guint len)
911 {
912   /* Packets should be at least 188 bytes, but check anyway */
913   g_assert (len >= 2 || !data);
914 
915   if (!mux->streamheader_sent && data) {
916     guint pid = ((data[1] & 0x1f) << 8) | data[2];
917     /* if it's a PAT or a PMT */
918     if (pid == 0x00 || (pid >= TSMUX_START_PMT_PID && pid < TSMUX_START_ES_PID)) {
919       GstBuffer *hbuf;
920 
921       if (!buf) {
922         hbuf = gst_buffer_new_and_alloc (len);
923         gst_buffer_fill (hbuf, 0, data, len);
924       } else {
925         hbuf = gst_buffer_copy (buf);
926       }
927       GST_LOG_OBJECT (mux,
928           "Collecting packet with pid 0x%04x into streamheaders", pid);
929 
930       g_queue_push_tail (&mux->streamheader, hbuf);
931     } else if (!g_queue_is_empty (&mux->streamheader)) {
932       gst_base_ts_mux_set_header_on_caps (mux);
933       mux->streamheader_sent = TRUE;
934     }
935   }
936 
937   if (buf) {
938     if (mux->is_header) {
939       GST_LOG_OBJECT (mux, "marking as header buffer");
940       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_HEADER);
941     }
942     if (mux->is_delta) {
943       GST_LOG_OBJECT (mux, "marking as delta unit");
944       GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
945     } else {
946       GST_DEBUG_OBJECT (mux, "marking as non-delta unit");
947       mux->is_delta = TRUE;
948     }
949   }
950 }
951 
952 static GstFlowReturn
gst_base_ts_mux_push_packets(GstBaseTsMux * mux,gboolean force)953 gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force)
954 {
955   GstBufferList *buffer_list;
956   gint align = mux->alignment;
957   gint av, packet_size;
958 
959   packet_size = mux->packet_size;
960 
961   if (align < 0)
962     align = mux->automatic_alignment;
963 
964   av = gst_adapter_available (mux->out_adapter);
965   GST_LOG_OBJECT (mux, "align %d, av %d", align, av);
966 
967   if (av == 0)
968     return GST_FLOW_OK;
969 
970   /* no alignment, just push all available data */
971   if (align == 0) {
972     buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av);
973     return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux),
974         buffer_list);
975   }
976 
977   align *= packet_size;
978 
979   if (!force && align > av)
980     return GST_FLOW_OK;
981 
982   buffer_list = gst_buffer_list_new_sized ((av / align) + 1);
983 
984   GST_LOG_OBJECT (mux, "aligning to %d bytes", align);
985   while (align <= av) {
986     GstBuffer *buf;
987     GstClockTime pts;
988 
989     pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
990     buf = gst_adapter_take_buffer (mux->out_adapter, align);
991 
992     GST_BUFFER_PTS (buf) = pts;
993 
994     gst_buffer_list_add (buffer_list, buf);
995     av -= align;
996   }
997 
998   if (av > 0 && force) {
999     GstBuffer *buf;
1000     GstClockTime pts;
1001     guint8 *data;
1002     guint32 header;
1003     gint dummy;
1004     GstMapInfo map;
1005 
1006     GST_LOG_OBJECT (mux, "handling %d leftover bytes", av);
1007 
1008     pts = gst_adapter_prev_pts (mux->out_adapter, NULL);
1009     buf = gst_buffer_new_and_alloc (align);
1010 
1011     GST_BUFFER_PTS (buf) = pts;
1012 
1013     gst_buffer_map (buf, &map, GST_MAP_READ);
1014     data = map.data;
1015 
1016     gst_adapter_copy (mux->out_adapter, data, 0, av);
1017     gst_adapter_clear (mux->out_adapter);
1018 
1019     data += av;
1020     header = GST_READ_UINT32_BE (data - packet_size);
1021 
1022     dummy = (map.size - av) / packet_size;
1023     GST_LOG_OBJECT (mux, "adding %d null packets", dummy);
1024 
1025     for (; dummy > 0; dummy--) {
1026       gint offset;
1027 
1028       if (packet_size > GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH) {
1029         GST_WRITE_UINT32_BE (data, header);
1030         /* simply increase header a bit and never mind too much */
1031         header++;
1032         offset = 4;
1033       } else {
1034         offset = 0;
1035       }
1036       GST_WRITE_UINT8 (data + offset, TSMUX_SYNC_BYTE);
1037       /* null packet PID */
1038       GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
1039       /* no adaptation field exists | continuity counter undefined */
1040       GST_WRITE_UINT8 (data + offset + 3, 0x10);
1041       /* payload */
1042       memset (data + offset + 4, 0, GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH - 4);
1043       data += packet_size;
1044     }
1045 
1046     gst_buffer_unmap (buf, &map);
1047     gst_buffer_list_add (buffer_list, buf);
1048   }
1049 
1050   return gst_aggregator_finish_buffer_list (GST_AGGREGATOR (mux), buffer_list);
1051 }
1052 
1053 static GstFlowReturn
gst_base_ts_mux_collect_packet(GstBaseTsMux * mux,GstBuffer * buf)1054 gst_base_ts_mux_collect_packet (GstBaseTsMux * mux, GstBuffer * buf)
1055 {
1056   GST_LOG_OBJECT (mux, "collecting packet size %" G_GSIZE_FORMAT,
1057       gst_buffer_get_size (buf));
1058   gst_adapter_push (mux->out_adapter, buf);
1059 
1060   return GST_FLOW_OK;
1061 }
1062 
1063 static GstEvent *
check_pending_key_unit_event(GstEvent * pending_event,GstSegment * segment,GstClockTime timestamp,guint flags,GstClockTime pending_key_unit_ts)1064 check_pending_key_unit_event (GstEvent * pending_event, GstSegment * segment,
1065     GstClockTime timestamp, guint flags, GstClockTime pending_key_unit_ts)
1066 {
1067   GstClockTime running_time, stream_time;
1068   gboolean all_headers;
1069   guint count;
1070   GstEvent *event = NULL;
1071 
1072   g_assert (segment != NULL);
1073 
1074   if (pending_event == NULL)
1075     goto out;
1076 
1077   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1078       timestamp == GST_CLOCK_TIME_NONE)
1079     goto out;
1080 
1081   running_time = timestamp;
1082 
1083   GST_INFO ("now %" GST_TIME_FORMAT " wanted %" GST_TIME_FORMAT,
1084       GST_TIME_ARGS (running_time), GST_TIME_ARGS (pending_key_unit_ts));
1085   if (GST_CLOCK_TIME_IS_VALID (pending_key_unit_ts) &&
1086       running_time < pending_key_unit_ts)
1087     goto out;
1088 
1089   if (flags & GST_BUFFER_FLAG_DELTA_UNIT) {
1090     GST_INFO ("pending force key unit, waiting for keyframe");
1091     goto out;
1092   }
1093 
1094   stream_time = gst_segment_to_stream_time (segment,
1095       GST_FORMAT_TIME, timestamp);
1096 
1097   if (GST_EVENT_TYPE (pending_event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
1098     gst_video_event_parse_downstream_force_key_unit (pending_event,
1099         NULL, NULL, NULL, &all_headers, &count);
1100   } else {
1101     gst_video_event_parse_upstream_force_key_unit (pending_event, NULL,
1102         &all_headers, &count);
1103   }
1104 
1105   event =
1106       gst_video_event_new_downstream_force_key_unit (timestamp, stream_time,
1107       running_time, all_headers, count);
1108   gst_event_set_seqnum (event, gst_event_get_seqnum (pending_event));
1109 
1110 out:
1111   return event;
1112 }
1113 
1114 /* Called when the TsMux has prepared a packet for output. Return FALSE
1115  * on error */
1116 static gboolean
new_packet_cb(GstBuffer * buf,void * user_data,gint64 new_pcr)1117 new_packet_cb (GstBuffer * buf, void *user_data, gint64 new_pcr)
1118 {
1119   GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1120   GstAggregator *agg = GST_AGGREGATOR (mux);
1121   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1122   GstMapInfo map;
1123   GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1124 
1125   g_assert (klass->output_packet);
1126 
1127   gst_buffer_map (buf, &map, GST_MAP_READWRITE);
1128 
1129   if (!GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1130     /* tsmux isn't generating timestamps. Use the input times */
1131     GST_BUFFER_PTS (buf) = mux->last_ts;
1132   }
1133 
1134   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1135     if (!GST_CLOCK_STIME_IS_VALID (mux->output_ts_offset)) {
1136       GstClockTime output_start_time = agg_segment->position;
1137       if (agg_segment->position == -1
1138           || agg_segment->position < agg_segment->start) {
1139         output_start_time = agg_segment->start;
1140       }
1141 
1142       mux->output_ts_offset =
1143           GST_CLOCK_DIFF (GST_BUFFER_PTS (buf), output_start_time);
1144 
1145       GST_DEBUG_OBJECT (mux, "New output ts offset %" GST_STIME_FORMAT,
1146           GST_STIME_ARGS (mux->output_ts_offset));
1147     }
1148 
1149     GST_BUFFER_PTS (buf) += mux->output_ts_offset;
1150 
1151     agg_segment->position = GST_BUFFER_PTS (buf);
1152   } else if (agg_segment->position == -1
1153       || agg_segment->position < agg_segment->start) {
1154     GST_BUFFER_PTS (buf) = agg_segment->start;
1155   } else {
1156     GST_BUFFER_PTS (buf) = agg_segment->position;
1157   }
1158 
1159   /* do common init (flags and streamheaders) */
1160   new_packet_common_init (mux, buf, map.data, map.size);
1161 
1162   gst_buffer_unmap (buf, &map);
1163 
1164   return klass->output_packet (mux, buf, new_pcr);
1165 }
1166 
1167 /* called when TsMux needs new packet to write into */
1168 static void
alloc_packet_cb(GstBuffer ** buf,void * user_data)1169 alloc_packet_cb (GstBuffer ** buf, void *user_data)
1170 {
1171   GstBaseTsMux *mux = (GstBaseTsMux *) user_data;
1172   GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
1173 
1174   g_assert (klass->allocate_packet);
1175 
1176   klass->allocate_packet (mux, buf);
1177 }
1178 
1179 static GstFlowReturn
gst_base_ts_mux_aggregate_buffer(GstBaseTsMux * mux,GstAggregatorPad * agg_pad,GstBuffer * buf)1180 gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
1181     GstAggregatorPad * agg_pad, GstBuffer * buf)
1182 {
1183   GstFlowReturn ret = GST_FLOW_OK;
1184   GstBaseTsMuxPad *best = GST_BASE_TS_MUX_PAD (agg_pad);
1185   TsMuxProgram *prog;
1186   gint64 pts = GST_CLOCK_STIME_NONE;
1187   gint64 dts = GST_CLOCK_STIME_NONE;
1188   gboolean delta = TRUE, header = FALSE;
1189   StreamData *stream_data;
1190   GstMpegtsSection *scte_section = NULL;
1191 
1192   GST_DEBUG_OBJECT (mux, "Pads collected");
1193 
1194   if (buf && gst_buffer_get_size (buf) == 0
1195       && GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP)) {
1196     gst_buffer_unref (buf);
1197     return GST_FLOW_OK;
1198   }
1199 
1200   g_mutex_lock (&mux->lock);
1201   if (G_UNLIKELY (mux->first)) {
1202     ret = gst_base_ts_mux_create_streams (mux);
1203     if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1204       if (buf)
1205         gst_buffer_unref (buf);
1206       g_mutex_unlock (&mux->lock);
1207       return ret;
1208     }
1209 
1210     mux->first = FALSE;
1211   }
1212 
1213   prog = best->prog;
1214   if (prog == NULL) {
1215     GList *cur;
1216 
1217     gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best));
1218     tsmux_resend_pat (mux->tsmux);
1219     tsmux_resend_si (mux->tsmux);
1220     prog = best->prog;
1221     g_assert_nonnull (prog);
1222 
1223     /* output PMT for each program */
1224     for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1225       TsMuxProgram *program = (TsMuxProgram *) cur->data;
1226 
1227       tsmux_resend_pmt (program);
1228     }
1229   }
1230 
1231   g_assert (buf != NULL);
1232 
1233   if (best->prepare_func) {
1234     GstBuffer *tmp;
1235 
1236     tmp = best->prepare_func (buf, best, mux);
1237     g_assert (tmp);
1238     gst_buffer_unref (buf);
1239     buf = tmp;
1240   }
1241 
1242   if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
1243     GstEvent *event;
1244 
1245     g_mutex_unlock (&mux->lock);
1246     event = check_pending_key_unit_event (mux->force_key_unit_event,
1247         &agg_pad->segment, GST_BUFFER_PTS (buf),
1248         GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
1249     if (event) {
1250       GstClockTime running_time;
1251       guint count;
1252       GList *cur;
1253 
1254       mux->pending_key_unit_ts = GST_CLOCK_TIME_NONE;
1255       gst_event_replace (&mux->force_key_unit_event, NULL);
1256 
1257       gst_video_event_parse_downstream_force_key_unit (event,
1258           NULL, NULL, &running_time, NULL, &count);
1259 
1260       GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d "
1261           "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event),
1262           GST_TIME_ARGS (running_time), count);
1263       gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
1264 
1265       g_mutex_lock (&mux->lock);
1266       /* output PAT, SI tables */
1267       tsmux_resend_pat (mux->tsmux);
1268       tsmux_resend_si (mux->tsmux);
1269 
1270       /* output PMT for each program */
1271       for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1272         TsMuxProgram *program = (TsMuxProgram *) cur->data;
1273 
1274         tsmux_resend_pmt (program);
1275       }
1276     } else {
1277       g_mutex_lock (&mux->lock);
1278     }
1279   }
1280 
1281   if (G_UNLIKELY (prog->pcr_stream == NULL)) {
1282     /* Take the first data stream for the PCR */
1283     GST_DEBUG_OBJECT (best,
1284         "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
1285         best->pid, best->prog_id);
1286 
1287     /* Set the chosen PCR stream */
1288     tsmux_program_set_pcr_stream (prog, best->stream);
1289   }
1290 
1291   GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid);
1292 
1293   GST_OBJECT_LOCK (mux);
1294   scte_section = mux->pending_scte35_section;
1295   mux->pending_scte35_section = NULL;
1296   GST_OBJECT_UNLOCK (mux);
1297   if (G_UNLIKELY (scte_section)) {
1298     GST_DEBUG_OBJECT (mux, "Sending pending SCTE section");
1299     if (!tsmux_send_section (mux->tsmux, scte_section))
1300       GST_ERROR_OBJECT (mux, "Error sending SCTE section !");
1301   }
1302 
1303   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) {
1304     pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf));
1305     GST_DEBUG_OBJECT (mux, "Buffer has PTS  %" GST_TIME_FORMAT " pts %"
1306         G_GINT64_FORMAT "%s", GST_TIME_ARGS (GST_BUFFER_PTS (buf)), pts,
1307         !GST_BUFFER_FLAG_IS_SET (buf,
1308             GST_BUFFER_FLAG_DELTA_UNIT) ? " (keyframe)" : "");
1309   }
1310 
1311   if (GST_CLOCK_STIME_IS_VALID (best->dts)) {
1312     dts = GSTTIME_TO_MPEGTIME (best->dts);
1313     GST_DEBUG_OBJECT (mux, "Buffer has DTS %" GST_STIME_FORMAT " dts %"
1314         G_GINT64_FORMAT, GST_STIME_ARGS (best->dts), dts);
1315   }
1316 
1317   /* should not have a DTS without PTS */
1318   if (!GST_CLOCK_STIME_IS_VALID (pts) && GST_CLOCK_STIME_IS_VALID (dts)) {
1319     GST_DEBUG_OBJECT (mux, "using DTS for unknown PTS");
1320     pts = dts;
1321   }
1322 
1323   if (best->stream->is_video_stream) {
1324     delta = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1325     header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER);
1326   }
1327 
1328   if (best->stream->is_meta && gst_buffer_get_size (buf) > (G_MAXUINT16 - 3)) {
1329     GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
1330 
1331     gst_buffer_unref (buf);
1332     g_mutex_unlock (&mux->lock);
1333     return GST_FLOW_OK;
1334   }
1335 
1336   GST_DEBUG_OBJECT (mux, "delta: %d", delta);
1337 
1338   if (gst_buffer_get_size (buf) > 0) {
1339     stream_data = stream_data_new (buf);
1340     tsmux_stream_add_data (best->stream, stream_data->map_info.data,
1341         stream_data->map_info.size, stream_data, pts, dts, !delta);
1342   }
1343 
1344   /* outgoing ts follows ts of PCR program stream */
1345   if (prog->pcr_stream == best->stream) {
1346     /* prefer DTS if present for PCR as it should be monotone */
1347     mux->last_ts =
1348         GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (buf)) ?
1349         GST_BUFFER_DTS (buf) : GST_BUFFER_PTS (buf);
1350   }
1351 
1352   mux->is_delta = delta;
1353   mux->is_header = header;
1354   while (tsmux_stream_bytes_in_buffer (best->stream) > 0) {
1355     if (!tsmux_write_stream_packet (mux->tsmux, best->stream)) {
1356       /* Failed writing data for some reason. Set appropriate error */
1357       GST_DEBUG_OBJECT (mux, "Failed to write data packet");
1358       GST_ELEMENT_ERROR (mux, STREAM, MUX,
1359           ("Failed writing output data to stream %04x", best->stream->id),
1360           (NULL));
1361       goto write_fail;
1362     }
1363   }
1364   g_mutex_unlock (&mux->lock);
1365   /* flush packet cache */
1366   return gst_base_ts_mux_push_packets (mux, FALSE);
1367 
1368   /* ERRORS */
1369 write_fail:
1370   {
1371     return mux->last_flow_ret;
1372   }
1373 }
1374 
1375 /* GstElement implementation */
1376 static gboolean
gst_base_ts_mux_has_pad_with_pid(GstBaseTsMux * mux,guint16 pid)1377 gst_base_ts_mux_has_pad_with_pid (GstBaseTsMux * mux, guint16 pid)
1378 {
1379   GList *l;
1380   gboolean res = FALSE;
1381 
1382   GST_OBJECT_LOCK (mux);
1383 
1384   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1385     GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
1386 
1387     if (tpad->pid == pid) {
1388       res = TRUE;
1389       break;
1390     }
1391   }
1392 
1393   GST_OBJECT_UNLOCK (mux);
1394   return res;
1395 }
1396 
1397 static GstPad *
gst_base_ts_mux_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)1398 gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
1399     const gchar * name, const GstCaps * caps)
1400 {
1401   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1402   gint pid = -1;
1403   GstPad *pad = NULL;
1404   gchar *free_name = NULL;
1405 
1406   g_mutex_lock (&mux->lock);
1407   if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
1408     if (tsmux_find_stream (mux->tsmux, pid)) {
1409       g_mutex_unlock (&mux->lock);
1410       goto stream_exists;
1411     }
1412     /* Make sure we don't use reserved PID.
1413      * FIXME : This should be extended to other variants (ex: ATSC) reserved PID */
1414     if (pid < TSMUX_START_ES_PID)
1415       goto invalid_stream_pid;
1416   } else {
1417     do {
1418       pid = tsmux_get_new_pid (mux->tsmux);
1419     } while (gst_base_ts_mux_has_pad_with_pid (mux, pid));
1420 
1421     /* Name the pad correctly after the selected pid */
1422     name = free_name = g_strdup_printf ("sink_%d", pid);
1423   }
1424   g_mutex_unlock (&mux->lock);
1425 
1426   pad = (GstPad *)
1427       GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
1428       templ, name, caps);
1429 
1430   gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (pad));
1431   GST_BASE_TS_MUX_PAD (pad)->pid = pid;
1432 
1433   g_free (free_name);
1434 
1435   return pad;
1436 
1437   /* ERRORS */
1438 stream_exists:
1439   {
1440     GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"),
1441         (NULL));
1442     return NULL;
1443   }
1444 
1445 invalid_stream_pid:
1446   {
1447     GST_ELEMENT_ERROR (element, STREAM, MUX,
1448         ("Invalid Elementary stream PID (0x%02u < 0x40)", pid), (NULL));
1449     return NULL;
1450   }
1451 }
1452 
1453 static void
gst_base_ts_mux_release_pad(GstElement * element,GstPad * pad)1454 gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
1455 {
1456   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1457 
1458   g_mutex_lock (&mux->lock);
1459   if (mux->tsmux) {
1460     GList *cur;
1461     GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
1462     gint pid = ts_pad->pid;
1463 
1464     if (ts_pad->prog) {
1465       if (ts_pad->prog->pcr_stream == ts_pad->stream) {
1466         tsmux_program_set_pcr_stream (ts_pad->prog, NULL);
1467       }
1468       if (tsmux_remove_stream (mux->tsmux, pid, ts_pad->prog)) {
1469         g_hash_table_remove (mux->programs, GINT_TO_POINTER (ts_pad->prog_id));
1470       }
1471     }
1472 
1473     tsmux_resend_pat (mux->tsmux);
1474     tsmux_resend_si (mux->tsmux);
1475 
1476     /* output PMT for each program */
1477     for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1478       TsMuxProgram *program = (TsMuxProgram *) cur->data;
1479 
1480       tsmux_resend_pmt (program);
1481     }
1482   }
1483   g_mutex_unlock (&mux->lock);
1484 
1485   GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad);
1486 }
1487 
1488 /* GstAggregator implementation */
1489 
1490 static void
request_keyframe(GstBaseTsMux * mux,GstClockTime running_time)1491 request_keyframe (GstBaseTsMux * mux, GstClockTime running_time)
1492 {
1493   GList *l;
1494   GST_OBJECT_LOCK (mux);
1495 
1496   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
1497     gst_pad_push_event (GST_PAD (l->data),
1498         gst_video_event_new_upstream_force_key_unit (running_time, TRUE, 0));
1499   }
1500 
1501   GST_OBJECT_UNLOCK (mux);
1502 }
1503 
1504 static const guint32 crc_tab[256] = {
1505   0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b,
1506   0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
1507   0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd, 0x4c11db70, 0x48d0c6c7,
1508   0x4593e01e, 0x4152fda9, 0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
1509   0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011, 0x791d4014, 0x7ddc5da3,
1510   0x709f7b7a, 0x745e66cd, 0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039,
1511   0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5, 0xbe2b5b58, 0xbaea46ef,
1512   0xb7a96036, 0xb3687d81, 0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d,
1513   0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49, 0xc7361b4c, 0xc3f706fb,
1514   0xceb42022, 0xca753d95, 0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
1515   0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d, 0x34867077, 0x30476dc0,
1516   0x3d044b19, 0x39c556ae, 0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072,
1517   0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16, 0x018aeb13, 0x054bf6a4,
1518   0x0808d07d, 0x0cc9cdca, 0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde,
1519   0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02, 0x5e9f46bf, 0x5a5e5b08,
1520   0x571d7dd1, 0x53dc6066, 0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
1521   0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e, 0xbfa1b04b, 0xbb60adfc,
1522   0xb6238b25, 0xb2e29692, 0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6,
1523   0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a, 0xe0b41de7, 0xe4750050,
1524   0xe9362689, 0xedf73b3e, 0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2,
1525   0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686, 0xd5b88683, 0xd1799b34,
1526   0xdc3abded, 0xd8fba05a, 0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
1527   0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb, 0x4f040d56, 0x4bc510e1,
1528   0x46863638, 0x42472b8f, 0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53,
1529   0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47, 0x36194d42, 0x32d850f5,
1530   0x3f9b762c, 0x3b5a6b9b, 0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff,
1531   0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623, 0xf12f560e, 0xf5ee4bb9,
1532   0xf8ad6d60, 0xfc6c70d7, 0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
1533   0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f, 0xc423cd6a, 0xc0e2d0dd,
1534   0xcda1f604, 0xc960ebb3, 0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7,
1535   0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b, 0x9b3660c6, 0x9ff77d71,
1536   0x92b45ba8, 0x9675461f, 0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3,
1537   0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640, 0x4e8ee645, 0x4a4ffbf2,
1538   0x470cdd2b, 0x43cdc09c, 0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
1539   0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24, 0x119b4be9, 0x155a565e,
1540   0x18197087, 0x1cd86d30, 0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec,
1541   0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088, 0x2497d08d, 0x2056cd3a,
1542   0x2d15ebe3, 0x29d4f654, 0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0,
1543   0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c, 0xe3a1cbc1, 0xe760d676,
1544   0xea23f0af, 0xeee2ed18, 0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
1545   0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662,
1546   0x933eb0bb, 0x97ffad0c, 0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
1547   0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
1548 };
1549 
1550 static guint32
_calc_crc32(const guint8 * data,guint datalen)1551 _calc_crc32 (const guint8 * data, guint datalen)
1552 {
1553   gint i;
1554   guint32 crc = 0xffffffff;
1555 
1556   for (i = 0; i < datalen; i++) {
1557     crc = (crc << 8) ^ crc_tab[((crc >> 24) ^ *data++) & 0xff];
1558   }
1559   return crc;
1560 }
1561 
1562 #define MPEGTIME_TO_GSTTIME(t) ((t) * (guint64)100000 / 9)
1563 
1564 static GstMpegtsSCTESpliceEvent *
copy_splice(GstMpegtsSCTESpliceEvent * splice)1565 copy_splice (GstMpegtsSCTESpliceEvent * splice)
1566 {
1567   return g_boxed_copy (GST_TYPE_MPEGTS_SCTE_SPLICE_EVENT, splice);
1568 }
1569 
1570 static void
free_splice(GstMpegtsSCTESpliceEvent * splice)1571 free_splice (GstMpegtsSCTESpliceEvent * splice)
1572 {
1573   g_boxed_free (GST_TYPE_MPEGTS_SCTE_SPLICE_EVENT, splice);
1574 }
1575 
1576 /* FIXME: get rid of this when depending on glib >= 2.62 */
1577 
1578 static GPtrArray *
_g_ptr_array_copy(GPtrArray * array,GCopyFunc func,GFreeFunc free_func,gpointer user_data)1579 _g_ptr_array_copy (GPtrArray * array,
1580     GCopyFunc func, GFreeFunc free_func, gpointer user_data)
1581 {
1582   GPtrArray *new_array;
1583 
1584   g_return_val_if_fail (array != NULL, NULL);
1585 
1586   new_array = g_ptr_array_new_with_free_func (free_func);
1587 
1588   g_ptr_array_set_size (new_array, array->len);
1589 
1590   if (func != NULL) {
1591     guint i;
1592 
1593     for (i = 0; i < array->len; i++)
1594       new_array->pdata[i] = func (array->pdata[i], user_data);
1595   } else if (array->len > 0) {
1596     memcpy (new_array->pdata, array->pdata,
1597         array->len * sizeof (*array->pdata));
1598   }
1599 
1600   new_array->len = array->len;
1601 
1602   return new_array;
1603 }
1604 
1605 static GstMpegtsSCTESIT *
deep_copy_sit(const GstMpegtsSCTESIT * sit)1606 deep_copy_sit (const GstMpegtsSCTESIT * sit)
1607 {
1608   GstMpegtsSCTESIT *sit_copy = g_boxed_copy (GST_TYPE_MPEGTS_SCTE_SIT, sit);
1609   GPtrArray *splices_copy =
1610       _g_ptr_array_copy (sit_copy->splices, (GCopyFunc) copy_splice,
1611       (GFreeFunc) free_splice, NULL);
1612 
1613   g_ptr_array_unref (sit_copy->splices);
1614   sit_copy->splices = splices_copy;
1615 
1616   return sit_copy;
1617 }
1618 
1619 /* Takes ownership of @section.
1620  *
1621  * This function is a bit complex because the SCTE sections can
1622  * have various origins:
1623  *
1624  * * Sections created by the application with the gst_mpegts_scte_*_new()
1625  *   API. The splice times / durations contained by these are expressed
1626  *   in the GStreamer running time domain, and must be translated to
1627  *   our local PES time domain. In this case, we will packetize the section
1628  *   ourselves.
1629  *
1630  * * Sections passed through from tsdemux: this case is complicated as
1631  *   splice times in the incoming stream may be encrypted, with pts_adjustment
1632  *   being the only timing field guaranteed *not* to be encrypted. In this
1633  *   case, the original binary data (section->data) will be reinjected as is
1634  *   in the output stream, with pts_adjustment adjusted. tsdemux provides us
1635  *   with the pts_offset it introduces, the difference between the original
1636  *   PES PTSs and the running times it outputs.
1637  *
1638  * Additionally, in either of these cases when the splice times aren't encrypted
1639  * we want to make use of those to request keyframes. For the passthrough case,
1640  * as the splice times are left untouched tsdemux provides us with the running
1641  * times the section originally referred to. We cannot calculate it locally
1642  * because we would need to have access to the information that the timestamps
1643  * in the original PES domain have wrapped around, and how many times they have
1644  * done so. While we could probably make educated guesses, tsdemux (more specifically
1645  * mpegtspacketizer) already keeps track of that, and it seemed more logical to
1646  * perform the calculation there and forward it alongside the downstream events.
1647  *
1648  * Finally, while we can't request keyframes at splice points in the encrypted
1649  * case, if the input stream was compliant in that regard and no reencoding took
1650  * place the splice times will still match with valid splice points, it is up
1651  * to the application to ensure that that is the case.
1652  */
1653 static void
handle_scte35_section(GstBaseTsMux * mux,GstEvent * event,GstMpegtsSection * section,guint64 mpeg_pts_offset,GstStructure * rtime_map)1654 handle_scte35_section (GstBaseTsMux * mux, GstEvent * event,
1655     GstMpegtsSection * section, guint64 mpeg_pts_offset,
1656     GstStructure * rtime_map)
1657 {
1658   GstMpegtsSCTESIT *sit;
1659   guint i;
1660   gboolean forward = TRUE;
1661   guint64 pts_adjust;
1662   guint8 *section_data;
1663   guint8 *crc;
1664   gboolean translate = FALSE;
1665 
1666   sit = (GstMpegtsSCTESIT *) gst_mpegts_section_get_scte_sit (section);
1667 
1668   /* When the application injects manually constructed splice events,
1669    * their time domain is the GStreamer running time, we receive them
1670    * unpacketized and translate the fields in the SIT to local PTS.
1671    *
1672    * We make a copy of the SIT in order to make sure we can rewrite it.
1673    */
1674   if (sit->is_running_time) {
1675     sit = deep_copy_sit (sit);
1676     translate = TRUE;
1677   }
1678 
1679   switch (sit->splice_command_type) {
1680     case GST_MTS_SCTE_SPLICE_COMMAND_NULL:
1681       /* We implement heartbeating ourselves */
1682       forward = FALSE;
1683       break;
1684     case GST_MTS_SCTE_SPLICE_COMMAND_SCHEDULE:
1685       /* No need to request keyframes at this point, splice_insert
1686        * messages will precede the future splice points and we
1687        * can request keyframes then. Only translate if needed.
1688        */
1689       if (translate) {
1690         for (i = 0; i < sit->splices->len; i++) {
1691           GstMpegtsSCTESpliceEvent *sevent =
1692               g_ptr_array_index (sit->splices, i);
1693 
1694           if (sevent->program_splice_time_specified)
1695             sevent->program_splice_time =
1696                 GSTTIME_TO_MPEGTIME (sevent->program_splice_time) +
1697                 TS_MUX_CLOCK_BASE;
1698 
1699           if (sevent->duration_flag)
1700             sevent->break_duration =
1701                 GSTTIME_TO_MPEGTIME (sevent->break_duration);
1702         }
1703       }
1704       break;
1705     case GST_MTS_SCTE_SPLICE_COMMAND_INSERT:
1706       /* We want keyframes at splice points */
1707       if (sit->fully_parsed && (rtime_map || translate)) {
1708 
1709         for (i = 0; i < sit->splices->len; i++) {
1710           guint64 running_time = GST_CLOCK_TIME_NONE;
1711 
1712           GstMpegtsSCTESpliceEvent *sevent =
1713               g_ptr_array_index (sit->splices, i);
1714           if (sevent->program_splice_time_specified) {
1715             if (rtime_map) {
1716               gchar *field_name = g_strdup_printf ("event-%u-splice-time",
1717                   sevent->splice_event_id);
1718               if (gst_structure_get_uint64 (rtime_map, field_name,
1719                       &running_time)) {
1720                 GST_DEBUG_OBJECT (mux,
1721                     "Requesting keyframe for splice point at %" GST_TIME_FORMAT,
1722                     GST_TIME_ARGS (running_time));
1723                 request_keyframe (mux, running_time);
1724               }
1725               g_free (field_name);
1726             } else {
1727               g_assert (translate == TRUE);
1728               running_time = sevent->program_splice_time;
1729               GST_DEBUG_OBJECT (mux,
1730                   "Requesting keyframe for splice point at %" GST_TIME_FORMAT,
1731                   GST_TIME_ARGS (running_time));
1732               request_keyframe (mux, running_time);
1733               sevent->program_splice_time =
1734                   GSTTIME_TO_MPEGTIME (running_time) + TS_MUX_CLOCK_BASE;
1735             }
1736           } else {
1737             GST_DEBUG_OBJECT (mux,
1738                 "Requesting keyframe for immediate splice point");
1739             request_keyframe (mux, GST_CLOCK_TIME_NONE);
1740           }
1741 
1742           if (sevent->duration_flag) {
1743             if (translate) {
1744               sevent->break_duration =
1745                   GSTTIME_TO_MPEGTIME (sevent->break_duration);
1746             }
1747 
1748             /* Even if auto_return is FALSE, when a break_duration is specified it
1749              * is intended as a redundancy mechanism in case the follow-up
1750              * splice insert goes missing.
1751              *
1752              * Schedule a keyframe at that point (if we can calculate its position
1753              * accurately).
1754              */
1755             if (GST_CLOCK_TIME_IS_VALID (running_time)) {
1756               running_time += MPEGTIME_TO_GSTTIME (sevent->break_duration);
1757               GST_DEBUG_OBJECT (mux,
1758                   "Requesting keyframe for end of break at %" GST_TIME_FORMAT,
1759                   GST_TIME_ARGS (running_time));
1760               request_keyframe (mux, running_time);
1761             }
1762           }
1763         }
1764       }
1765       break;
1766     case GST_MTS_SCTE_SPLICE_COMMAND_TIME:{
1767       /* Adjust timestamps and potentially request keyframes */
1768       gboolean do_request_keyframes = FALSE;
1769 
1770       /* TODO: we can probably be a little more fine-tuned about determining
1771        * whether a keyframe is actually needed, but this at least takes care
1772        * of the requirement in 10.3.4 that a keyframe should not be created
1773        * when the signal contains only a time_descriptor.
1774        */
1775       if (sit->fully_parsed && (rtime_map || translate)) {
1776         for (i = 0; i < sit->descriptors->len; i++) {
1777           GstMpegtsDescriptor *descriptor =
1778               g_ptr_array_index (sit->descriptors, i);
1779 
1780           switch (descriptor->tag) {
1781             case GST_MTS_SCTE_DESC_AVAIL:
1782             case GST_MTS_SCTE_DESC_DTMF:
1783             case GST_MTS_SCTE_DESC_SEGMENTATION:
1784               do_request_keyframes = TRUE;
1785               break;
1786             case GST_MTS_SCTE_DESC_TIME:
1787             case GST_MTS_SCTE_DESC_AUDIO:
1788               break;
1789           }
1790 
1791           if (do_request_keyframes)
1792             break;
1793         }
1794 
1795         if (sit->splice_time_specified) {
1796           GstClockTime running_time = GST_CLOCK_TIME_NONE;
1797 
1798           if (rtime_map) {
1799             if (do_request_keyframes
1800                 && gst_structure_get_uint64 (rtime_map, "splice-time",
1801                     &running_time)) {
1802               GST_DEBUG_OBJECT (mux,
1803                   "Requesting keyframe for time signal at %" GST_TIME_FORMAT,
1804                   GST_TIME_ARGS (running_time));
1805               request_keyframe (mux, running_time);
1806             }
1807           } else {
1808             g_assert (translate);
1809             running_time = sit->splice_time;
1810             sit->splice_time =
1811                 GSTTIME_TO_MPEGTIME (running_time) + TS_MUX_CLOCK_BASE;
1812             if (do_request_keyframes) {
1813               GST_DEBUG_OBJECT (mux,
1814                   "Requesting keyframe for time signal at %" GST_TIME_FORMAT,
1815                   GST_TIME_ARGS (running_time));
1816               request_keyframe (mux, running_time);
1817             }
1818           }
1819         } else if (do_request_keyframes) {
1820           GST_DEBUG_OBJECT (mux,
1821               "Requesting keyframe for immediate time signal");
1822           request_keyframe (mux, GST_CLOCK_TIME_NONE);
1823         }
1824       }
1825       break;
1826     }
1827     case GST_MTS_SCTE_SPLICE_COMMAND_BANDWIDTH:
1828     case GST_MTS_SCTE_SPLICE_COMMAND_PRIVATE:
1829       /* Just let those go through untouched, none of our business */
1830       break;
1831     default:
1832       break;
1833   }
1834 
1835   if (!forward) {
1836     gst_mpegts_section_unref (section);
1837     return;
1838   }
1839 
1840   if (!translate) {
1841     g_assert (section->data);
1842     /* Calculate the final adjustment, as a sum of:
1843      * - The adjustment in the original packet
1844      * - The offset introduced between the original local PTS
1845      *   and the GStreamer PTS output by tsdemux
1846      * - Our own 1-hour offset
1847      */
1848     pts_adjust = sit->pts_adjustment + mpeg_pts_offset + TS_MUX_CLOCK_BASE;
1849 
1850     /* Account for offsets potentially introduced between the demuxer and us */
1851     pts_adjust +=
1852         GSTTIME_TO_MPEGTIME (gst_event_get_running_time_offset (event));
1853 
1854     pts_adjust &= 0x1ffffffff;
1855     section_data = g_memdup2 (section->data, section->section_length);
1856     section_data[4] |= pts_adjust >> 32;
1857     section_data[5] = pts_adjust >> 24;
1858     section_data[6] = pts_adjust >> 16;
1859     section_data[7] = pts_adjust >> 8;
1860     section_data[8] = pts_adjust;
1861 
1862     /* Now rewrite our checksum */
1863     crc = section_data + section->section_length - 4;
1864     GST_WRITE_UINT32_BE (crc, _calc_crc32 (section_data, crc - section_data));
1865 
1866     GST_OBJECT_LOCK (mux);
1867     GST_DEBUG_OBJECT (mux, "Storing SCTE section");
1868     if (mux->pending_scte35_section)
1869       gst_mpegts_section_unref (mux->pending_scte35_section);
1870     mux->pending_scte35_section =
1871         gst_mpegts_section_new (mux->scte35_pid, section_data,
1872         section->section_length);
1873     GST_OBJECT_UNLOCK (mux);
1874 
1875     gst_mpegts_section_unref (section);
1876   } else {
1877     GST_OBJECT_LOCK (mux);
1878     GST_DEBUG_OBJECT (mux, "Storing SCTE section");
1879     gst_mpegts_section_unref (section);
1880     if (mux->pending_scte35_section)
1881       gst_mpegts_section_unref (mux->pending_scte35_section);
1882     mux->pending_scte35_section =
1883         gst_mpegts_section_from_scte_sit (sit, mux->scte35_pid);;
1884     GST_OBJECT_UNLOCK (mux);
1885   }
1886 }
1887 
1888 static gboolean
gst_base_ts_mux_send_event(GstElement * element,GstEvent * event)1889 gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
1890 {
1891   GstMpegtsSection *section;
1892   GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
1893 
1894   section = gst_event_parse_mpegts_section (event);
1895 
1896   if (section) {
1897     GST_DEBUG ("Received event with mpegts section");
1898 
1899     if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
1900       handle_scte35_section (mux, event, section, 0, NULL);
1901     } else {
1902       g_mutex_lock (&mux->lock);
1903       /* TODO: Check that the section type is supported */
1904       tsmux_add_mpegts_si_section (mux->tsmux, section);
1905       g_mutex_unlock (&mux->lock);
1906     }
1907 
1908     gst_event_unref (event);
1909 
1910     return TRUE;
1911   }
1912 
1913   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1914 }
1915 
1916 /* GstAggregator implementation */
1917 
1918 static gboolean
gst_base_ts_mux_sink_event(GstAggregator * agg,GstAggregatorPad * agg_pad,GstEvent * event)1919 gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
1920     GstEvent * event)
1921 {
1922   GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
1923   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
1924   GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (agg_pad);
1925   gboolean res = FALSE;
1926   gboolean forward = TRUE;
1927 
1928   switch (GST_EVENT_TYPE (event)) {
1929     case GST_EVENT_CAPS:
1930     {
1931       GstCaps *caps;
1932       GstFlowReturn ret;
1933       GList *cur;
1934 
1935       g_mutex_lock (&mux->lock);
1936       if (ts_pad->stream == NULL) {
1937         g_mutex_unlock (&mux->lock);
1938         break;
1939       }
1940 
1941       forward = FALSE;
1942 
1943       gst_event_parse_caps (event, &caps);
1944       if (!caps || !gst_caps_is_fixed (caps)) {
1945         g_mutex_unlock (&mux->lock);
1946         break;
1947       }
1948 
1949       ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
1950       if (ret != GST_FLOW_OK) {
1951         g_mutex_unlock (&mux->lock);
1952         break;
1953       }
1954 
1955       mux->tsmux->pat_changed = TRUE;
1956       mux->tsmux->si_changed = TRUE;
1957       tsmux_resend_pat (mux->tsmux);
1958       tsmux_resend_si (mux->tsmux);
1959 
1960       /* output PMT for each program */
1961       for (cur = mux->tsmux->programs; cur; cur = cur->next) {
1962         TsMuxProgram *program = (TsMuxProgram *) cur->data;
1963 
1964         program->pmt_changed = TRUE;
1965         tsmux_resend_pmt (program);
1966       }
1967       g_mutex_unlock (&mux->lock);
1968 
1969       res = TRUE;
1970       break;
1971     }
1972     case GST_EVENT_CUSTOM_DOWNSTREAM:
1973     {
1974       GstClockTime timestamp, stream_time, running_time;
1975       gboolean all_headers;
1976       guint count;
1977       const GstStructure *s;
1978 
1979       s = gst_event_get_structure (event);
1980 
1981       if (gst_structure_has_name (s, "scte-sit") && mux->scte35_pid != 0) {
1982 
1983         /* When operating downstream of tsdemux, tsdemux will send out events
1984          * on all its source pads for each splice table it encounters. If we
1985          * are remuxing multiple streams it has demuxed, this means we could
1986          * unnecessarily repeat the same table multiple times, we avoid that
1987          * by deduplicating thanks to the event sequm
1988          */
1989         if (gst_event_get_seqnum (event) != mux->last_scte35_event_seqnum) {
1990           GstMpegtsSection *section;
1991 
1992           gst_structure_get (s, "section", GST_TYPE_MPEGTS_SECTION, &section,
1993               NULL);
1994           if (section) {
1995             guint64 mpeg_pts_offset = 0;
1996             GstStructure *rtime_map = NULL;
1997 
1998             gst_structure_get (s, "running-time-map", GST_TYPE_STRUCTURE,
1999                 &rtime_map, NULL);
2000             gst_structure_get_uint64 (s, "mpeg-pts-offset", &mpeg_pts_offset);
2001 
2002             handle_scte35_section (mux, event, section, mpeg_pts_offset,
2003                 rtime_map);
2004             if (rtime_map)
2005               gst_structure_free (rtime_map);
2006             mux->last_scte35_event_seqnum = gst_event_get_seqnum (event);
2007           } else {
2008             GST_WARNING_OBJECT (ts_pad,
2009                 "Ignoring scte-sit event without a section");
2010           }
2011         } else {
2012           GST_DEBUG_OBJECT (ts_pad, "Ignoring duplicate scte-sit event");
2013         }
2014         res = TRUE;
2015         forward = FALSE;
2016         goto out;
2017       }
2018 
2019       if (!gst_video_event_is_force_key_unit (event))
2020         goto out;
2021 
2022       res = TRUE;
2023       forward = FALSE;
2024 
2025       gst_video_event_parse_downstream_force_key_unit (event,
2026           &timestamp, &stream_time, &running_time, &all_headers, &count);
2027       GST_INFO_OBJECT (ts_pad, "have downstream force-key-unit event, "
2028           "seqnum %d, running-time %" GST_TIME_FORMAT " count %d",
2029           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count);
2030 
2031       if (mux->force_key_unit_event != NULL) {
2032         GST_INFO_OBJECT (mux, "skipping downstream force key unit event "
2033             "as an upstream force key unit is already queued");
2034         goto out;
2035       }
2036 
2037       if (!all_headers)
2038         goto out;
2039 
2040       mux->pending_key_unit_ts = running_time;
2041       gst_event_replace (&mux->force_key_unit_event, event);
2042       break;
2043     }
2044     case GST_EVENT_TAG:{
2045       GstTagList *list;
2046       gchar *lang = NULL;
2047 
2048       GST_DEBUG_OBJECT (mux, "received tag event");
2049       gst_event_parse_tag (event, &list);
2050 
2051       /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */
2052       if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) {
2053         const gchar *lang_code;
2054 
2055         lang_code = gst_tag_get_language_code_iso_639_2B (lang);
2056         if (lang_code) {
2057           GST_DEBUG_OBJECT (ts_pad, "Setting language to '%s'", lang_code);
2058 
2059           g_free (ts_pad->language);
2060           ts_pad->language = g_strdup (lang_code);
2061         } else {
2062           GST_WARNING_OBJECT (ts_pad, "Did not get language code for '%s'",
2063               lang);
2064         }
2065         g_free (lang);
2066       }
2067 
2068       /* handled this, don't want collectpads to forward it downstream */
2069       res = TRUE;
2070       forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL;
2071       break;
2072     }
2073     case GST_EVENT_STREAM_START:{
2074       GstStreamFlags flags;
2075 
2076       gst_event_parse_stream_flags (event, &flags);
2077 
2078       /* Don't wait for data on sparse inputs like metadata streams */
2079       /*
2080          if ((flags & GST_STREAM_FLAG_SPARSE)) {
2081          GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED);
2082          gst_collect_pads_set_waiting (pads, data, FALSE);
2083          GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED);
2084          }
2085        */
2086       break;
2087     }
2088     default:
2089       break;
2090   }
2091 
2092 out:
2093   if (!forward)
2094     gst_event_unref (event);
2095   else
2096     res = agg_class->sink_event (agg, agg_pad, event);
2097 
2098   return res;
2099 }
2100 
2101 static gboolean
gst_base_ts_mux_src_event(GstAggregator * agg,GstEvent * event)2102 gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
2103 {
2104   GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class);
2105   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2106   gboolean res = TRUE, forward = TRUE;
2107 
2108   switch (GST_EVENT_TYPE (event)) {
2109     case GST_EVENT_CUSTOM_UPSTREAM:
2110     {
2111       GstIterator *iter;
2112       GValue sinkpad_value = G_VALUE_INIT;
2113       GstClockTime running_time;
2114       gboolean all_headers, done = FALSE;
2115       guint count;
2116 
2117       if (!gst_video_event_is_force_key_unit (event))
2118         break;
2119 
2120       forward = FALSE;
2121 
2122       gst_video_event_parse_upstream_force_key_unit (event,
2123           &running_time, &all_headers, &count);
2124 
2125       GST_INFO_OBJECT (mux, "received upstream force-key-unit event, "
2126           "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d",
2127           gst_event_get_seqnum (event), GST_TIME_ARGS (running_time),
2128           all_headers, count);
2129 
2130       if (!all_headers)
2131         break;
2132 
2133       mux->pending_key_unit_ts = running_time;
2134       gst_event_replace (&mux->force_key_unit_event, event);
2135 
2136       iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
2137 
2138       while (!done) {
2139         switch (gst_iterator_next (iter, &sinkpad_value)) {
2140           case GST_ITERATOR_OK:{
2141             GstPad *sinkpad = g_value_get_object (&sinkpad_value);
2142             gboolean tmp;
2143 
2144             GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
2145             tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
2146             GST_INFO_OBJECT (mux, "result %d", tmp);
2147             /* succeed if at least one pad succeeds */
2148             res |= tmp;
2149             break;
2150           }
2151           case GST_ITERATOR_DONE:
2152             done = TRUE;
2153             break;
2154           case GST_ITERATOR_RESYNC:
2155             gst_iterator_resync (iter);
2156             break;
2157           case GST_ITERATOR_ERROR:
2158             g_assert_not_reached ();
2159             break;
2160         }
2161         g_value_reset (&sinkpad_value);
2162       }
2163       g_value_unset (&sinkpad_value);
2164       gst_iterator_free (iter);
2165       break;
2166     }
2167     default:
2168       break;
2169   }
2170 
2171   if (forward)
2172     res = agg_class->src_event (agg, event);
2173   else
2174     gst_event_unref (event);
2175 
2176   return res;
2177 }
2178 
2179 static GstBuffer *
gst_base_ts_mux_clip(GstAggregator * agg,GstAggregatorPad * agg_pad,GstBuffer * buf)2180 gst_base_ts_mux_clip (GstAggregator * agg,
2181     GstAggregatorPad * agg_pad, GstBuffer * buf)
2182 {
2183   GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (agg_pad);
2184   GstClockTime time;
2185   GstBuffer *ret;
2186 
2187   ret = buf;
2188 
2189   /* PTS */
2190   time = GST_BUFFER_PTS (buf);
2191 
2192   /* invalid left alone and passed */
2193   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
2194     time =
2195         gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME, time);
2196     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) {
2197       GST_DEBUG_OBJECT (pad, "clipping buffer on pad outside segment");
2198       gst_buffer_unref (buf);
2199       ret = NULL;
2200       goto beach;
2201     } else {
2202       GST_LOG_OBJECT (pad, "buffer pts %" GST_TIME_FORMAT " ->  %"
2203           GST_TIME_FORMAT " running time",
2204           GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time));
2205       buf = ret = gst_buffer_make_writable (buf);
2206       GST_BUFFER_PTS (ret) = time;
2207     }
2208   }
2209 
2210   /* DTS */
2211   time = GST_BUFFER_DTS (buf);
2212 
2213   /* invalid left alone and passed */
2214   if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) {
2215     gint sign;
2216     gint64 dts;
2217 
2218     sign = gst_segment_to_running_time_full (&agg_pad->segment, GST_FORMAT_TIME,
2219         time, &time);
2220 
2221     if (sign > 0)
2222       dts = (gint64) time;
2223     else
2224       dts = -((gint64) time);
2225 
2226     GST_LOG_OBJECT (pad, "buffer dts %" GST_TIME_FORMAT " -> %"
2227         GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
2228         GST_STIME_ARGS (dts));
2229 
2230     if (GST_CLOCK_STIME_IS_VALID (pad->dts) && dts < pad->dts) {
2231       /* Ignore DTS going backward */
2232       GST_WARNING_OBJECT (pad, "ignoring DTS going backward");
2233       dts = pad->dts;
2234     }
2235 
2236     ret = gst_buffer_make_writable (buf);
2237     if (sign > 0)
2238       GST_BUFFER_DTS (ret) = time;
2239     else
2240       GST_BUFFER_DTS (ret) = GST_CLOCK_TIME_NONE;
2241 
2242     pad->dts = dts;
2243   } else {
2244     pad->dts = GST_CLOCK_STIME_NONE;
2245   }
2246 
2247 beach:
2248   return ret;
2249 }
2250 
2251 static GstBaseTsMuxPad *
gst_base_ts_mux_find_best_pad(GstAggregator * aggregator)2252 gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
2253 {
2254   GstBaseTsMuxPad *best = NULL;
2255   GstClockTime best_ts = GST_CLOCK_TIME_NONE;
2256   GList *l;
2257 
2258   GST_OBJECT_LOCK (aggregator);
2259 
2260   for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
2261     GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
2262     GstAggregatorPad *apad = GST_AGGREGATOR_PAD_CAST (tpad);
2263     GstBuffer *buffer;
2264 
2265     buffer = gst_aggregator_pad_peek_buffer (apad);
2266     if (!buffer)
2267       continue;
2268     if (best_ts == GST_CLOCK_TIME_NONE) {
2269       best = tpad;
2270       best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
2271     } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
2272       GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
2273       if (t < best_ts) {
2274         best = tpad;
2275         best_ts = t;
2276       }
2277     }
2278     gst_buffer_unref (buffer);
2279   }
2280 
2281   if (best)
2282     gst_object_ref (best);
2283 
2284   GST_OBJECT_UNLOCK (aggregator);
2285 
2286   GST_DEBUG_OBJECT (aggregator,
2287       "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
2288       GST_TIME_ARGS (best_ts), best);
2289 
2290   return best;
2291 }
2292 
2293 static gboolean
gst_base_ts_mux_are_all_pads_eos(GstBaseTsMux * mux)2294 gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
2295 {
2296   GList *l;
2297   gboolean ret = TRUE;
2298 
2299   GST_OBJECT_LOCK (mux);
2300 
2301   for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
2302     GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
2303 
2304     if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) {
2305       ret = FALSE;
2306       break;
2307     }
2308   }
2309 
2310   GST_OBJECT_UNLOCK (mux);
2311 
2312   return ret;
2313 }
2314 
2315 
2316 static GstFlowReturn
gst_base_ts_mux_aggregate(GstAggregator * agg,gboolean timeout)2317 gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
2318 {
2319   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2320   GstFlowReturn ret = GST_FLOW_OK;
2321   GstBaseTsMuxPad *best = gst_base_ts_mux_find_best_pad (agg);
2322   GstCaps *caps;
2323 
2324   /* set caps on the srcpad if no caps were set yet */
2325   if (!(caps = gst_pad_get_current_caps (agg->srcpad))) {
2326     GstStructure *structure;
2327 
2328     caps = gst_pad_get_pad_template_caps (GST_AGGREGATOR_SRC_PAD (mux));
2329     caps = gst_caps_make_writable (caps);
2330     structure = gst_caps_get_structure (caps, 0);
2331     gst_structure_set (structure, "packetsize", G_TYPE_INT, mux->packet_size,
2332         NULL);
2333 
2334     gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps);
2335   }
2336   gst_caps_unref (caps);
2337 
2338   if (best) {
2339     GstBuffer *buffer;
2340 
2341     buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best));
2342     if (!buffer) {
2343       /* We might have gotten a flush event after we picked the pad */
2344       goto done;
2345     }
2346 
2347     ret =
2348         gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
2349         GST_AGGREGATOR_PAD (best), buffer);
2350 
2351     gst_object_unref (best);
2352 
2353     if (ret != GST_FLOW_OK)
2354       goto done;
2355   }
2356 
2357   if (gst_base_ts_mux_are_all_pads_eos (mux)) {
2358     GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux);
2359     /* drain some possibly cached data */
2360     if (klass->drain)
2361       klass->drain (mux);
2362     gst_base_ts_mux_push_packets (mux, TRUE);
2363 
2364     ret = GST_FLOW_EOS;
2365   }
2366 
2367 done:
2368   return ret;
2369 }
2370 
2371 static gboolean
gst_base_ts_mux_start(GstAggregator * agg)2372 gst_base_ts_mux_start (GstAggregator * agg)
2373 {
2374   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2375 
2376   g_mutex_lock (&mux->lock);
2377   gst_base_ts_mux_reset (mux, TRUE);
2378   g_mutex_unlock (&mux->lock);
2379 
2380   return TRUE;
2381 }
2382 
2383 static gboolean
gst_base_ts_mux_stop(GstAggregator * agg)2384 gst_base_ts_mux_stop (GstAggregator * agg)
2385 {
2386   GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
2387 
2388   g_mutex_lock (&mux->lock);
2389   gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
2390   g_mutex_unlock (&mux->lock);
2391 
2392   return TRUE;
2393 }
2394 
2395 /* GObject implementation */
2396 
2397 static void
gst_base_ts_mux_dispose(GObject * object)2398 gst_base_ts_mux_dispose (GObject * object)
2399 {
2400   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2401 
2402   g_mutex_lock (&mux->lock);
2403   gst_base_ts_mux_reset (mux, FALSE);
2404 
2405   if (mux->out_adapter) {
2406     g_object_unref (mux->out_adapter);
2407     mux->out_adapter = NULL;
2408   }
2409   if (mux->prog_map) {
2410     gst_structure_free (mux->prog_map);
2411     mux->prog_map = NULL;
2412   }
2413   if (mux->programs) {
2414     g_hash_table_destroy (mux->programs);
2415     mux->programs = NULL;
2416   }
2417   g_mutex_unlock (&mux->lock);
2418   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
2419 }
2420 
2421 static void
gst_base_ts_mux_finalize(GObject * object)2422 gst_base_ts_mux_finalize (GObject * object)
2423 {
2424   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2425 
2426   g_mutex_clear (&mux->lock);
2427   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
2428 }
2429 
2430 static void
gst_base_ts_mux_constructed(GObject * object)2431 gst_base_ts_mux_constructed (GObject * object)
2432 {
2433   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2434 
2435   /* initial state */
2436   g_mutex_lock (&mux->lock);
2437   gst_base_ts_mux_reset (mux, TRUE);
2438   g_mutex_unlock (&mux->lock);
2439 }
2440 
2441 static void
gst_base_ts_mux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)2442 gst_base_ts_mux_set_property (GObject * object, guint prop_id,
2443     const GValue * value, GParamSpec * pspec)
2444 {
2445   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2446   GList *l;
2447 
2448   switch (prop_id) {
2449     case PROP_PROG_MAP:
2450     {
2451       const GstStructure *s = gst_value_get_structure (value);
2452       if (mux->prog_map) {
2453         gst_structure_free (mux->prog_map);
2454       }
2455       if (s)
2456         mux->prog_map = gst_structure_copy (s);
2457       else
2458         mux->prog_map = NULL;
2459       break;
2460     }
2461     case PROP_PAT_INTERVAL:
2462       mux->pat_interval = g_value_get_uint (value);
2463       g_mutex_lock (&mux->lock);
2464       if (mux->tsmux)
2465         tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
2466       g_mutex_unlock (&mux->lock);
2467       break;
2468     case PROP_PMT_INTERVAL:
2469       mux->pmt_interval = g_value_get_uint (value);
2470       GST_OBJECT_LOCK (mux);
2471       for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
2472         GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
2473 
2474         g_mutex_lock (&mux->lock);
2475         tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
2476         g_mutex_unlock (&mux->lock);
2477       }
2478       GST_OBJECT_UNLOCK (mux);
2479       break;
2480     case PROP_ALIGNMENT:
2481       mux->alignment = g_value_get_int (value);
2482       break;
2483     case PROP_SI_INTERVAL:
2484       mux->si_interval = g_value_get_uint (value);
2485       g_mutex_lock (&mux->lock);
2486       tsmux_set_si_interval (mux->tsmux, mux->si_interval);
2487       g_mutex_unlock (&mux->lock);
2488       break;
2489     case PROP_BITRATE:
2490       mux->bitrate = g_value_get_uint64 (value);
2491       g_mutex_lock (&mux->lock);
2492       if (mux->tsmux)
2493         tsmux_set_bitrate (mux->tsmux, mux->bitrate);
2494       g_mutex_unlock (&mux->lock);
2495       break;
2496     case PROP_PCR_INTERVAL:
2497       mux->pcr_interval = g_value_get_uint (value);
2498       g_mutex_lock (&mux->lock);
2499       if (mux->tsmux)
2500         tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
2501       g_mutex_unlock (&mux->lock);
2502       break;
2503     case PROP_SCTE_35_PID:
2504       mux->scte35_pid = g_value_get_uint (value);
2505       break;
2506     case PROP_SCTE_35_NULL_INTERVAL:
2507       mux->scte35_null_interval = g_value_get_uint (value);
2508       break;
2509     default:
2510       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2511       break;
2512   }
2513 }
2514 
2515 static void
gst_base_ts_mux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)2516 gst_base_ts_mux_get_property (GObject * object, guint prop_id,
2517     GValue * value, GParamSpec * pspec)
2518 {
2519   GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
2520 
2521   switch (prop_id) {
2522     case PROP_PROG_MAP:
2523       gst_value_set_structure (value, mux->prog_map);
2524       break;
2525     case PROP_PAT_INTERVAL:
2526       g_value_set_uint (value, mux->pat_interval);
2527       break;
2528     case PROP_PMT_INTERVAL:
2529       g_value_set_uint (value, mux->pmt_interval);
2530       break;
2531     case PROP_ALIGNMENT:
2532       g_value_set_int (value, mux->alignment);
2533       break;
2534     case PROP_SI_INTERVAL:
2535       g_value_set_uint (value, mux->si_interval);
2536       break;
2537     case PROP_BITRATE:
2538       g_value_set_uint64 (value, mux->bitrate);
2539       break;
2540     case PROP_PCR_INTERVAL:
2541       g_value_set_uint (value, mux->pcr_interval);
2542       break;
2543     case PROP_SCTE_35_PID:
2544       g_value_set_uint (value, mux->scte35_pid);
2545       break;
2546     case PROP_SCTE_35_NULL_INTERVAL:
2547       g_value_set_uint (value, mux->scte35_null_interval);
2548       break;
2549     default:
2550       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2551       break;
2552   }
2553 }
2554 
2555 /* Default vmethods implementation */
2556 
2557 static TsMux *
gst_base_ts_mux_default_create_ts_mux(GstBaseTsMux * mux)2558 gst_base_ts_mux_default_create_ts_mux (GstBaseTsMux * mux)
2559 {
2560   TsMux *tsmux = tsmux_new ();
2561   tsmux_set_write_func (tsmux, new_packet_cb, mux);
2562   tsmux_set_alloc_func (tsmux, alloc_packet_cb, mux);
2563   tsmux_set_pat_interval (tsmux, mux->pat_interval);
2564   tsmux_set_si_interval (tsmux, mux->si_interval);
2565   tsmux_set_bitrate (tsmux, mux->bitrate);
2566   tsmux_set_pcr_interval (tsmux, mux->pcr_interval);
2567 
2568   return tsmux;
2569 }
2570 
2571 static void
gst_base_ts_mux_default_allocate_packet(GstBaseTsMux * mux,GstBuffer ** buffer)2572 gst_base_ts_mux_default_allocate_packet (GstBaseTsMux * mux,
2573     GstBuffer ** buffer)
2574 {
2575   GstBuffer *buf;
2576 
2577   buf = gst_buffer_new_and_alloc (mux->packet_size);
2578 
2579   *buffer = buf;
2580 }
2581 
2582 static gboolean
gst_base_ts_mux_default_output_packet(GstBaseTsMux * mux,GstBuffer * buffer,gint64 new_pcr)2583 gst_base_ts_mux_default_output_packet (GstBaseTsMux * mux, GstBuffer * buffer,
2584     gint64 new_pcr)
2585 {
2586   gst_base_ts_mux_collect_packet (mux, buffer);
2587 
2588   return TRUE;
2589 }
2590 
2591 /* Subclass API */
2592 
2593 void
gst_base_ts_mux_set_packet_size(GstBaseTsMux * mux,gsize size)2594 gst_base_ts_mux_set_packet_size (GstBaseTsMux * mux, gsize size)
2595 {
2596   mux->packet_size = size;
2597 }
2598 
2599 void
gst_base_ts_mux_set_automatic_alignment(GstBaseTsMux * mux,gsize alignment)2600 gst_base_ts_mux_set_automatic_alignment (GstBaseTsMux * mux, gsize alignment)
2601 {
2602   mux->automatic_alignment = alignment;
2603 }
2604 
2605 static void
gst_base_ts_mux_class_init(GstBaseTsMuxClass * klass)2606 gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
2607 {
2608   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
2609   GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass);
2610   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
2611 
2612   GST_DEBUG_CATEGORY_INIT (gst_base_ts_mux_debug, "basetsmux", 0,
2613       "MPEG Transport Stream muxer");
2614 
2615   gst_element_class_set_static_metadata (gstelement_class,
2616       "MPEG Transport Stream Muxer", "Codec/Muxer",
2617       "Multiplexes media streams into an MPEG Transport Stream",
2618       "Fluendo <contact@fluendo.com>");
2619 
2620   gobject_class->set_property =
2621       GST_DEBUG_FUNCPTR (gst_base_ts_mux_set_property);
2622   gobject_class->get_property =
2623       GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property);
2624   gobject_class->dispose = gst_base_ts_mux_dispose;
2625   gobject_class->finalize = gst_base_ts_mux_finalize;
2626   gobject_class->constructed = gst_base_ts_mux_constructed;
2627 
2628   gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
2629   gstelement_class->release_pad = gst_base_ts_mux_release_pad;
2630   gstelement_class->send_event = gst_base_ts_mux_send_event;
2631 
2632   gstagg_class->negotiate = NULL;
2633   gstagg_class->aggregate = gst_base_ts_mux_aggregate;
2634   gstagg_class->clip = gst_base_ts_mux_clip;
2635   gstagg_class->sink_event = gst_base_ts_mux_sink_event;
2636   gstagg_class->src_event = gst_base_ts_mux_src_event;
2637   gstagg_class->start = gst_base_ts_mux_start;
2638   gstagg_class->stop = gst_base_ts_mux_stop;
2639 
2640   klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux;
2641   klass->allocate_packet = gst_base_ts_mux_default_allocate_packet;
2642   klass->output_packet = gst_base_ts_mux_default_output_packet;
2643 
2644   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PROG_MAP,
2645       g_param_spec_boxed ("prog-map", "Program map",
2646           "A GstStructure specifies the mapping from elementary streams to programs",
2647           GST_TYPE_STRUCTURE,
2648           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2649 
2650   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PAT_INTERVAL,
2651       g_param_spec_uint ("pat-interval", "PAT interval",
2652           "Set the interval (in ticks of the 90kHz clock) for writing out the PAT table",
2653           1, G_MAXUINT, TSMUX_DEFAULT_PAT_INTERVAL,
2654           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2655 
2656   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PMT_INTERVAL,
2657       g_param_spec_uint ("pmt-interval", "PMT interval",
2658           "Set the interval (in ticks of the 90kHz clock) for writing out the PMT table",
2659           1, G_MAXUINT, TSMUX_DEFAULT_PMT_INTERVAL,
2660           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2661 
2662   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_ALIGNMENT,
2663       g_param_spec_int ("alignment", "packet alignment",
2664           "Number of packets per buffer (padded with dummy packets on EOS) "
2665           "(-1 = auto, 0 = all available packets, 7 for UDP streaming)",
2666           -1, G_MAXINT, BASETSMUX_DEFAULT_ALIGNMENT,
2667           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2668 
2669   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SI_INTERVAL,
2670       g_param_spec_uint ("si-interval", "SI interval",
2671           "Set the interval (in ticks of the 90kHz clock) for writing out the Service"
2672           "Information tables", 1, G_MAXUINT, TSMUX_DEFAULT_SI_INTERVAL,
2673           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2674 
2675   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BITRATE,
2676       g_param_spec_uint64 ("bitrate", "Bitrate (in bits per second)",
2677           "Set the target bitrate, will insert null packets as padding "
2678           " to achieve multiplex-wide constant bitrate (0 means no padding)",
2679           0, G_MAXUINT64, TSMUX_DEFAULT_BITRATE,
2680           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2681 
2682   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PCR_INTERVAL,
2683       g_param_spec_uint ("pcr-interval", "PCR interval",
2684           "Set the interval (in ticks of the 90kHz clock) for writing PCR",
2685           1, G_MAXUINT, TSMUX_DEFAULT_PCR_INTERVAL,
2686           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2687 
2688   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SCTE_35_PID,
2689       g_param_spec_uint ("scte-35-pid", "SCTE-35 PID",
2690           "PID to use for inserting SCTE-35 packets (0: unused)",
2691           0, G_MAXUINT, DEFAULT_SCTE_35_PID,
2692           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2693 
2694   g_object_class_install_property (G_OBJECT_CLASS (klass),
2695       PROP_SCTE_35_NULL_INTERVAL, g_param_spec_uint ("scte-35-null-interval",
2696           "SCTE-35 NULL packet interval",
2697           "Set the interval (in ticks of the 90kHz clock) for writing SCTE-35 NULL (heartbeat) packets."
2698           " (only valid if scte-35-pid is different from 0)", 1, G_MAXUINT,
2699           TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL,
2700           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
2701 
2702   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
2703       &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD);
2704 
2705   gst_type_mark_as_plugin_api (GST_TYPE_BASE_TS_MUX_PAD, 0);
2706 }
2707 
2708 static void
gst_base_ts_mux_init(GstBaseTsMux * mux)2709 gst_base_ts_mux_init (GstBaseTsMux * mux)
2710 {
2711   mux->out_adapter = gst_adapter_new ();
2712 
2713   /* properties */
2714   mux->pat_interval = TSMUX_DEFAULT_PAT_INTERVAL;
2715   mux->pmt_interval = TSMUX_DEFAULT_PMT_INTERVAL;
2716   mux->si_interval = TSMUX_DEFAULT_SI_INTERVAL;
2717   mux->pcr_interval = TSMUX_DEFAULT_PCR_INTERVAL;
2718   mux->prog_map = NULL;
2719   mux->alignment = BASETSMUX_DEFAULT_ALIGNMENT;
2720   mux->bitrate = TSMUX_DEFAULT_BITRATE;
2721   mux->scte35_pid = DEFAULT_SCTE_35_PID;
2722   mux->scte35_null_interval = TSMUX_DEFAULT_SCTE_35_NULL_INTERVAL;
2723 
2724   mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
2725   mux->automatic_alignment = 0;
2726 
2727   g_mutex_init (&mux->lock);
2728 }
2729