• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70 
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74 
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79 
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82 
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85 
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90 
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93 
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98 
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124 
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137 
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143 
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155 
156 static guint signals[SIGNAL_LAST];
157 
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183 
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202 
203 static void
_do_init(void)204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212 
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218 
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227 
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231 
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234 
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241 
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245 
246 static void do_async_done (GstSplitMuxSink * splitmux);
247 
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250     GstVideoTimeCode ** next_tc);
251 
252 static MqStreamBuf *
mq_stream_buf_new(void)253 mq_stream_buf_new (void)
254 {
255   return g_slice_new0 (MqStreamBuf);
256 }
257 
258 static void
mq_stream_buf_free(MqStreamBuf * data)259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261   g_slice_free (MqStreamBuf, data);
262 }
263 
264 static SplitMuxOutputCommand *
out_cmd_buf_new(void)265 out_cmd_buf_new (void)
266 {
267   return g_slice_new0 (SplitMuxOutputCommand);
268 }
269 
270 static void
out_cmd_buf_free(SplitMuxOutputCommand * data)271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273   g_slice_free (SplitMuxOutputCommand, data);
274 }
275 
276 static void
input_gop_free(InputGop * gop)277 input_gop_free (InputGop * gop)
278 {
279   g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280   g_slice_free (InputGop, gop);
281 }
282 
283 static void
gst_splitmux_sink_class_init(GstSplitMuxSinkClass * klass)284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286   GObjectClass *gobject_class = (GObjectClass *) klass;
287   GstElementClass *gstelement_class = (GstElementClass *) klass;
288   GstBinClass *gstbin_class = (GstBinClass *) klass;
289 
290   gobject_class->set_property = gst_splitmux_sink_set_property;
291   gobject_class->get_property = gst_splitmux_sink_get_property;
292   gobject_class->dispose = gst_splitmux_sink_dispose;
293   gobject_class->finalize = gst_splitmux_sink_finalize;
294 
295   gst_element_class_set_static_metadata (gstelement_class,
296       "Split Muxing Bin", "Generic/Bin/Muxer",
297       "Convenience bin that muxes incoming streams into multiple time/size limited files",
298       "Jan Schmidt <jan@centricular.com>");
299 
300   gst_element_class_add_static_pad_template (gstelement_class,
301       &video_sink_template);
302   gst_element_class_add_static_pad_template (gstelement_class,
303       &video_aux_sink_template);
304   gst_element_class_add_static_pad_template (gstelement_class,
305       &audio_sink_template);
306   gst_element_class_add_static_pad_template (gstelement_class,
307       &subtitle_sink_template);
308   gst_element_class_add_static_pad_template (gstelement_class,
309       &caption_sink_template);
310 
311   gstelement_class->change_state =
312       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313   gstelement_class->request_new_pad =
314       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315   gstelement_class->release_pad =
316       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317 
318   gstbin_class->handle_message = bus_handler;
319 
320   g_object_class_install_property (gobject_class, PROP_LOCATION,
321       g_param_spec_string ("location", "File Output Pattern",
322           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325       g_param_spec_double ("mux-overhead", "Muxing Overhead",
326           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327           DEFAULT_MUXER_OVERHEAD,
328           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329 
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333           DEFAULT_MAX_SIZE_TIME,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339           DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344           "Maximum difference in timecode between first and last frame. "
345           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346           "Will only be effective if a timecode track is present.", NULL,
347           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350       g_param_spec_boolean ("send-keyframe-requests",
351           "Request keyframes at max-size-time",
352           "Request a keyframe every max-size-time ns to try splitting at that point. "
353           "Needs max-size-bytes to be 0 in order to be effective.",
354           DEFAULT_SEND_KEYFRAME_REQUESTS,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358       g_param_spec_uint ("max-files", "Max files",
359           "Maximum number of files to keep on disk. Once the maximum is reached,"
360           "old files start to be deleted to make room for new ones.", 0,
361           G_MAXUINT, DEFAULT_MAX_FILES,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365           "Allow non-reference streams to be that many ns before the reference"
366           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368           G_PARAM_STATIC_STRINGS));
369 
370   g_object_class_install_property (gobject_class, PROP_MUXER,
371       g_param_spec_object ("muxer", "Muxer",
372           "The muxer element to use (NULL = default mp4mux). "
373           "Valid only for async-finalize = FALSE",
374           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_SINK,
376       g_param_spec_object ("sink", "Sink",
377           "The sink element (or element chain) to use (NULL = default filesink). "
378           "Valid only for async-finalize = FALSE",
379           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380 
381   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382       g_param_spec_boolean ("use-robust-muxing",
383           "Support robust-muxing mode of some muxers",
384           "Check if muxers support robust muxing via the reserved-max-duration and "
385           "reserved-duration-remaining properties and use them if so. "
386           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387           " create new fragments if the reserved header space is about to overflow. "
388           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389           "manually by the app to a non-zero value for robust muxing to have an effect.",
390           DEFAULT_USE_ROBUST_MUXING,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392 
393   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394       g_param_spec_boolean ("reset-muxer",
395           "Reset Muxer",
396           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398 
399   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400       g_param_spec_boolean ("async-finalize",
401           "Finalize fragments asynchronously",
402           "Finalize each fragment asynchronously and start a new one",
403           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405       g_param_spec_string ("muxer-factory", "Muxer factory",
406           "The muxer element factory to use (default = mp4mux). "
407           "Valid only for async-finalize = TRUE",
408           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstSplitMuxSink:muxer-preset
411    *
412    * An optional #GstPreset name to use for the muxer. This only has an effect
413    * in `async-finalize=TRUE` mode.
414    *
415    * Since: 1.18
416    */
417   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418       g_param_spec_string ("muxer-preset", "Muxer preset",
419           "The muxer preset to use. "
420           "Valid only for async-finalize = TRUE",
421           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423       g_param_spec_boxed ("muxer-properties", "Muxer properties",
424           "The muxer element properties to use. "
425           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426           "Valid only for async-finalize = TRUE",
427           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429       g_param_spec_string ("sink-factory", "Sink factory",
430           "The sink element factory to use (default = filesink). "
431           "Valid only for async-finalize = TRUE",
432           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433   /**
434    * GstSplitMuxSink:sink-preset
435    *
436    * An optional #GstPreset name to use for the sink. This only has an effect
437    * in `async-finalize=TRUE` mode.
438    *
439    * Since: 1.18
440    */
441   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442       g_param_spec_string ("sink-preset", "Sink preset",
443           "The sink preset to use. "
444           "Valid only for async-finalize = TRUE",
445           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447       g_param_spec_boxed ("sink-properties", "Sink properties",
448           "The sink element properties to use. "
449           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450           "Valid only for async-finalize = TRUE",
451           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452   g_object_class_install_property (gobject_class, PROP_START_INDEX,
453       g_param_spec_int ("start-index", "Start Index",
454           "Start value of fragment index.",
455           0, G_MAXINT, DEFAULT_START_INDEX,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457 
458   /**
459    * GstSplitMuxSink::muxer-pad-map
460    *
461    * An optional GstStructure that provides a map from splitmuxsink sinkpad
462    * names to muxer pad names they should feed. Splitmuxsink has some default
463    * mapping behaviour to link video to video pads and audio to audio pads
464    * that usually works fine. This property is useful if you need to ensure
465    * a particular mapping to muxed streams.
466    *
467    * The GstStructure contains string fields like so:
468    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469    *
470    * Since: 1.18
471    */
472   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475           GST_TYPE_STRUCTURE,
476           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477 
478   /**
479    * GstSplitMuxSink::format-location:
480    * @splitmux: the #GstSplitMuxSink
481    * @fragment_id: the sequence number of the file to be created
482    *
483    * Returns: the location to be used for the next output file. This must be
484    *    a newly-allocated string which will be freed with g_free() by the
485    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
486    *    g_strdup_printf() or similar functions to allocate it.
487    */
488   signals[SIGNAL_FORMAT_LOCATION] =
489       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491 
492   /**
493    * GstSplitMuxSink::format-location-full:
494    * @splitmux: the #GstSplitMuxSink
495    * @fragment_id: the sequence number of the file to be created
496    * @first_sample: A #GstSample containing the first buffer
497    *   from the reference stream in the new file
498    *
499    * Returns: the location to be used for the next output file. This must be
500    *    a newly-allocated string which will be freed with g_free() by the
501    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
502    *    g_strdup_printf() or similar functions to allocate it.
503    *
504    * Since: 1.12
505    */
506   signals[SIGNAL_FORMAT_LOCATION_FULL] =
507       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509       GST_TYPE_SAMPLE);
510 
511   /**
512    * GstSplitMuxSink::split-now:
513    * @splitmux: the #GstSplitMuxSink
514    *
515    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516    * The current GOP will be output to the new file.
517    *
518    * Since: 1.14
519    */
520   signals[SIGNAL_SPLIT_NOW] =
521       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524       G_TYPE_NONE, 0);
525 
526   /**
527    * GstSplitMuxSink::split-after:
528    * @splitmux: the #GstSplitMuxSink
529    *
530    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532    *
533    * Since: 1.16
534    */
535   signals[SIGNAL_SPLIT_AFTER] =
536       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539       G_TYPE_NONE, 0);
540 
541   /**
542    * GstSplitMuxSink::split-at-running-time:
543    * @splitmux: the #GstSplitMuxSink
544    *
545    * When called by the user, this action signal splits the video file (and
546    * begins a new one) as soon as the given running time is reached. If this
547    * action signal is called multiple times, running times are queued up and
548    * processed in the order they were given.
549    *
550    * Note that this is prone to race conditions, where said running time is
551    * reached and surpassed before we had a chance to split. The file will
552    * still split immediately, but in order to make sure that the split doesn't
553    * happen too late, it is recommended to call this action signal from
554    * something that will prevent further buffers from flowing into
555    * splitmuxsink before the split is completed, such as a pad probe before
556    * splitmuxsink.
557    *
558    *
559    * Since: 1.16
560    */
561   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566 
567   /**
568    * GstSplitMuxSink::muxer-added:
569    * @splitmux: the #GstSplitMuxSink
570    * @muxer: the newly added muxer element
571    *
572    * Since: 1.14
573    */
574   signals[SIGNAL_MUXER_ADDED] =
575       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577 
578   /**
579    * GstSplitMuxSink::sink-added:
580    * @splitmux: the #GstSplitMuxSink
581    * @sink: the newly added sink element
582    *
583    * Since: 1.14
584    */
585   signals[SIGNAL_SINK_ADDED] =
586       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588 
589   klass->split_now = split_now;
590   klass->split_after = split_after;
591   klass->split_at_running_time = split_at_running_time;
592 }
593 
594 static void
gst_splitmux_sink_init(GstSplitMuxSink * splitmux)595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597   g_mutex_init (&splitmux->lock);
598   g_mutex_init (&splitmux->state_lock);
599   g_cond_init (&splitmux->input_cond);
600   g_cond_init (&splitmux->output_cond);
601   g_queue_init (&splitmux->out_cmd_q);
602 
603   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606   splitmux->max_files = DEFAULT_MAX_FILES;
607   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611 
612   splitmux->threshold_timecode_str = NULL;
613 
614   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616   splitmux->muxer_properties = NULL;
617   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618   splitmux->sink_properties = NULL;
619 
620   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621   splitmux->split_requested = FALSE;
622   splitmux->do_split_next_gop = FALSE;
623   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625 
626   g_queue_init (&splitmux->pending_input_gops);
627 }
628 
629 static void
gst_splitmux_reset_elements(GstSplitMuxSink * splitmux)630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632   if (splitmux->muxer) {
633     gst_element_set_locked_state (splitmux->muxer, TRUE);
634     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636   }
637   if (splitmux->active_sink) {
638     gst_element_set_locked_state (splitmux->active_sink, TRUE);
639     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641   }
642 
643   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645 
646 static void
gst_splitmux_sink_dispose(GObject * object)647 gst_splitmux_sink_dispose (GObject * object)
648 {
649   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650 
651   /* Calling parent dispose invalidates all child pointers */
652   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653 
654   G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656 
657 static void
gst_splitmux_sink_finalize(GObject * object)658 gst_splitmux_sink_finalize (GObject * object)
659 {
660   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661 
662   g_cond_clear (&splitmux->input_cond);
663   g_cond_clear (&splitmux->output_cond);
664   g_mutex_clear (&splitmux->lock);
665   g_mutex_clear (&splitmux->state_lock);
666   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667   g_queue_clear (&splitmux->out_cmd_q);
668   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669   g_queue_clear (&splitmux->pending_input_gops);
670 
671   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672 
673   if (splitmux->muxerpad_map)
674     gst_structure_free (splitmux->muxerpad_map);
675 
676   if (splitmux->provided_sink)
677     gst_object_unref (splitmux->provided_sink);
678   if (splitmux->provided_muxer)
679     gst_object_unref (splitmux->provided_muxer);
680 
681   if (splitmux->muxer_factory)
682     g_free (splitmux->muxer_factory);
683   if (splitmux->muxer_preset)
684     g_free (splitmux->muxer_preset);
685   if (splitmux->muxer_properties)
686     gst_structure_free (splitmux->muxer_properties);
687   if (splitmux->sink_factory)
688     g_free (splitmux->sink_factory);
689   if (splitmux->sink_preset)
690     g_free (splitmux->sink_preset);
691   if (splitmux->sink_properties)
692     gst_structure_free (splitmux->sink_properties);
693 
694   if (splitmux->threshold_timecode_str)
695     g_free (splitmux->threshold_timecode_str);
696   if (splitmux->tc_interval)
697     gst_video_time_code_interval_free (splitmux->tc_interval);
698 
699   if (splitmux->times_to_split)
700     gst_queue_array_free (splitmux->times_to_split);
701 
702   g_free (splitmux->location);
703 
704   /* Make sure to free any un-released contexts. There should not be any,
705    * because the dispose will have freed all request pads though */
706   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707   g_list_free (splitmux->contexts);
708 
709   G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711 
712 /*
713  * Set any time threshold to the muxer, if it has
714  * reserved-max-duration and reserved-duration-remaining
715  * properties. Called when creating/claiming the muxer
716  * in create_elements() */
717 static void
update_muxer_properties(GstSplitMuxSink * sink)718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720   GObjectClass *klass;
721   GstClockTime threshold_time;
722 
723   sink->muxer_has_reserved_props = FALSE;
724   if (sink->muxer == NULL)
725     return;
726   klass = G_OBJECT_GET_CLASS (sink->muxer);
727   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728     return;
729   if (g_object_class_find_property (klass,
730           "reserved-duration-remaining") == NULL)
731     return;
732   sink->muxer_has_reserved_props = TRUE;
733 
734   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (sink->threshold_time));
736   GST_OBJECT_LOCK (sink);
737   threshold_time = sink->threshold_time;
738   GST_OBJECT_UNLOCK (sink);
739 
740   if (threshold_time > 0) {
741     /* Tell the muxer how much space to reserve */
742     GstClockTime muxer_threshold = threshold_time;
743     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744   }
745 }
746 
747 static void
gst_splitmux_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749     const GValue * value, GParamSpec * pspec)
750 {
751   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752 
753   switch (prop_id) {
754     case PROP_LOCATION:{
755       GST_OBJECT_LOCK (splitmux);
756       g_free (splitmux->location);
757       splitmux->location = g_value_dup_string (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     }
761     case PROP_START_INDEX:
762       GST_OBJECT_LOCK (splitmux);
763       splitmux->start_index = g_value_get_int (value);
764       GST_OBJECT_UNLOCK (splitmux);
765       break;
766     case PROP_MAX_SIZE_BYTES:
767       GST_OBJECT_LOCK (splitmux);
768       splitmux->threshold_bytes = g_value_get_uint64 (value);
769       GST_OBJECT_UNLOCK (splitmux);
770       break;
771     case PROP_MAX_SIZE_TIME:
772       GST_OBJECT_LOCK (splitmux);
773       splitmux->threshold_time = g_value_get_uint64 (value);
774       GST_OBJECT_UNLOCK (splitmux);
775       break;
776     case PROP_MAX_SIZE_TIMECODE:
777       GST_OBJECT_LOCK (splitmux);
778       g_free (splitmux->threshold_timecode_str);
779       /* will be calculated later */
780       g_clear_pointer (&splitmux->tc_interval,
781           gst_video_time_code_interval_free);
782 
783       splitmux->threshold_timecode_str = g_value_dup_string (value);
784       if (splitmux->threshold_timecode_str) {
785         splitmux->tc_interval =
786             gst_video_time_code_interval_new_from_string
787             (splitmux->threshold_timecode_str);
788         if (!splitmux->tc_interval) {
789           g_warning ("Wrong timecode string %s",
790               splitmux->threshold_timecode_str);
791           g_free (splitmux->threshold_timecode_str);
792           splitmux->threshold_timecode_str = NULL;
793         }
794       }
795       splitmux->next_fragment_start_tc_time =
796           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797           splitmux->fragment_start_time, NULL);
798       if (splitmux->tc_interval && splitmux->fragment_start_tc
799           && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800         GST_WARNING_OBJECT (splitmux,
801             "Couldn't calculate next fragment start time for timecode mode");
802       }
803       GST_OBJECT_UNLOCK (splitmux);
804       break;
805     case PROP_SEND_KEYFRAME_REQUESTS:
806       GST_OBJECT_LOCK (splitmux);
807       splitmux->send_keyframe_requests = g_value_get_boolean (value);
808       GST_OBJECT_UNLOCK (splitmux);
809       break;
810     case PROP_MAX_FILES:
811       GST_OBJECT_LOCK (splitmux);
812       splitmux->max_files = g_value_get_uint (value);
813       GST_OBJECT_UNLOCK (splitmux);
814       break;
815     case PROP_MUXER_OVERHEAD:
816       GST_OBJECT_LOCK (splitmux);
817       splitmux->mux_overhead = g_value_get_double (value);
818       GST_OBJECT_UNLOCK (splitmux);
819       break;
820     case PROP_USE_ROBUST_MUXING:
821       GST_OBJECT_LOCK (splitmux);
822       splitmux->use_robust_muxing = g_value_get_boolean (value);
823       GST_OBJECT_UNLOCK (splitmux);
824       if (splitmux->use_robust_muxing)
825         update_muxer_properties (splitmux);
826       break;
827     case PROP_ALIGNMENT_THRESHOLD:
828       GST_OBJECT_LOCK (splitmux);
829       splitmux->alignment_threshold = g_value_get_uint64 (value);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK:
833       GST_OBJECT_LOCK (splitmux);
834       gst_clear_object (&splitmux->provided_sink);
835       splitmux->provided_sink = g_value_get_object (value);
836       if (splitmux->provided_sink)
837         gst_object_ref_sink (splitmux->provided_sink);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_MUXER:
841       GST_OBJECT_LOCK (splitmux);
842       gst_clear_object (&splitmux->provided_muxer);
843       splitmux->provided_muxer = g_value_get_object (value);
844       if (splitmux->provided_muxer)
845         gst_object_ref_sink (splitmux->provided_muxer);
846       GST_OBJECT_UNLOCK (splitmux);
847       break;
848     case PROP_RESET_MUXER:
849       GST_OBJECT_LOCK (splitmux);
850       splitmux->reset_muxer = g_value_get_boolean (value);
851       GST_OBJECT_UNLOCK (splitmux);
852       break;
853     case PROP_ASYNC_FINALIZE:
854       GST_OBJECT_LOCK (splitmux);
855       splitmux->async_finalize = g_value_get_boolean (value);
856       GST_OBJECT_UNLOCK (splitmux);
857       break;
858     case PROP_MUXER_FACTORY:
859       GST_OBJECT_LOCK (splitmux);
860       if (splitmux->muxer_factory)
861         g_free (splitmux->muxer_factory);
862       splitmux->muxer_factory = g_value_dup_string (value);
863       GST_OBJECT_UNLOCK (splitmux);
864       break;
865     case PROP_MUXER_PRESET:
866       GST_OBJECT_LOCK (splitmux);
867       if (splitmux->muxer_preset)
868         g_free (splitmux->muxer_preset);
869       splitmux->muxer_preset = g_value_dup_string (value);
870       GST_OBJECT_UNLOCK (splitmux);
871       break;
872     case PROP_MUXER_PROPERTIES:
873       GST_OBJECT_LOCK (splitmux);
874       if (splitmux->muxer_properties)
875         gst_structure_free (splitmux->muxer_properties);
876       if (gst_value_get_structure (value))
877         splitmux->muxer_properties =
878             gst_structure_copy (gst_value_get_structure (value));
879       else
880         splitmux->muxer_properties = NULL;
881       GST_OBJECT_UNLOCK (splitmux);
882       break;
883     case PROP_SINK_FACTORY:
884       GST_OBJECT_LOCK (splitmux);
885       if (splitmux->sink_factory)
886         g_free (splitmux->sink_factory);
887       splitmux->sink_factory = g_value_dup_string (value);
888       GST_OBJECT_UNLOCK (splitmux);
889       break;
890     case PROP_SINK_PRESET:
891       GST_OBJECT_LOCK (splitmux);
892       if (splitmux->sink_preset)
893         g_free (splitmux->sink_preset);
894       splitmux->sink_preset = g_value_dup_string (value);
895       GST_OBJECT_UNLOCK (splitmux);
896       break;
897     case PROP_SINK_PROPERTIES:
898       GST_OBJECT_LOCK (splitmux);
899       if (splitmux->sink_properties)
900         gst_structure_free (splitmux->sink_properties);
901       if (gst_value_get_structure (value))
902         splitmux->sink_properties =
903             gst_structure_copy (gst_value_get_structure (value));
904       else
905         splitmux->sink_properties = NULL;
906       GST_OBJECT_UNLOCK (splitmux);
907       break;
908     case PROP_MUXERPAD_MAP:
909     {
910       const GstStructure *s = gst_value_get_structure (value);
911       GST_SPLITMUX_LOCK (splitmux);
912       if (splitmux->muxerpad_map) {
913         gst_structure_free (splitmux->muxerpad_map);
914       }
915       if (s)
916         splitmux->muxerpad_map = gst_structure_copy (s);
917       else
918         splitmux->muxerpad_map = NULL;
919       GST_SPLITMUX_UNLOCK (splitmux);
920       break;
921     }
922     default:
923       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
924       break;
925   }
926 }
927 
928 static void
gst_splitmux_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930     GValue * value, GParamSpec * pspec)
931 {
932   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
933 
934   switch (prop_id) {
935     case PROP_LOCATION:
936       GST_OBJECT_LOCK (splitmux);
937       g_value_set_string (value, splitmux->location);
938       GST_OBJECT_UNLOCK (splitmux);
939       break;
940     case PROP_START_INDEX:
941       GST_OBJECT_LOCK (splitmux);
942       g_value_set_int (value, splitmux->start_index);
943       GST_OBJECT_UNLOCK (splitmux);
944       break;
945     case PROP_MAX_SIZE_BYTES:
946       GST_OBJECT_LOCK (splitmux);
947       g_value_set_uint64 (value, splitmux->threshold_bytes);
948       GST_OBJECT_UNLOCK (splitmux);
949       break;
950     case PROP_MAX_SIZE_TIME:
951       GST_OBJECT_LOCK (splitmux);
952       g_value_set_uint64 (value, splitmux->threshold_time);
953       GST_OBJECT_UNLOCK (splitmux);
954       break;
955     case PROP_MAX_SIZE_TIMECODE:
956       GST_OBJECT_LOCK (splitmux);
957       g_value_set_string (value, splitmux->threshold_timecode_str);
958       GST_OBJECT_UNLOCK (splitmux);
959       break;
960     case PROP_SEND_KEYFRAME_REQUESTS:
961       GST_OBJECT_LOCK (splitmux);
962       g_value_set_boolean (value, splitmux->send_keyframe_requests);
963       GST_OBJECT_UNLOCK (splitmux);
964       break;
965     case PROP_MAX_FILES:
966       GST_OBJECT_LOCK (splitmux);
967       g_value_set_uint (value, splitmux->max_files);
968       GST_OBJECT_UNLOCK (splitmux);
969       break;
970     case PROP_MUXER_OVERHEAD:
971       GST_OBJECT_LOCK (splitmux);
972       g_value_set_double (value, splitmux->mux_overhead);
973       GST_OBJECT_UNLOCK (splitmux);
974       break;
975     case PROP_USE_ROBUST_MUXING:
976       GST_OBJECT_LOCK (splitmux);
977       g_value_set_boolean (value, splitmux->use_robust_muxing);
978       GST_OBJECT_UNLOCK (splitmux);
979       break;
980     case PROP_ALIGNMENT_THRESHOLD:
981       GST_OBJECT_LOCK (splitmux);
982       g_value_set_uint64 (value, splitmux->alignment_threshold);
983       GST_OBJECT_UNLOCK (splitmux);
984       break;
985     case PROP_SINK:
986       GST_OBJECT_LOCK (splitmux);
987       g_value_set_object (value, splitmux->provided_sink);
988       GST_OBJECT_UNLOCK (splitmux);
989       break;
990     case PROP_MUXER:
991       GST_OBJECT_LOCK (splitmux);
992       g_value_set_object (value, splitmux->provided_muxer);
993       GST_OBJECT_UNLOCK (splitmux);
994       break;
995     case PROP_RESET_MUXER:
996       GST_OBJECT_LOCK (splitmux);
997       g_value_set_boolean (value, splitmux->reset_muxer);
998       GST_OBJECT_UNLOCK (splitmux);
999       break;
1000     case PROP_ASYNC_FINALIZE:
1001       GST_OBJECT_LOCK (splitmux);
1002       g_value_set_boolean (value, splitmux->async_finalize);
1003       GST_OBJECT_UNLOCK (splitmux);
1004       break;
1005     case PROP_MUXER_FACTORY:
1006       GST_OBJECT_LOCK (splitmux);
1007       g_value_set_string (value, splitmux->muxer_factory);
1008       GST_OBJECT_UNLOCK (splitmux);
1009       break;
1010     case PROP_MUXER_PRESET:
1011       GST_OBJECT_LOCK (splitmux);
1012       g_value_set_string (value, splitmux->muxer_preset);
1013       GST_OBJECT_UNLOCK (splitmux);
1014       break;
1015     case PROP_MUXER_PROPERTIES:
1016       GST_OBJECT_LOCK (splitmux);
1017       gst_value_set_structure (value, splitmux->muxer_properties);
1018       GST_OBJECT_UNLOCK (splitmux);
1019       break;
1020     case PROP_SINK_FACTORY:
1021       GST_OBJECT_LOCK (splitmux);
1022       g_value_set_string (value, splitmux->sink_factory);
1023       GST_OBJECT_UNLOCK (splitmux);
1024       break;
1025     case PROP_SINK_PRESET:
1026       GST_OBJECT_LOCK (splitmux);
1027       g_value_set_string (value, splitmux->sink_preset);
1028       GST_OBJECT_UNLOCK (splitmux);
1029       break;
1030     case PROP_SINK_PROPERTIES:
1031       GST_OBJECT_LOCK (splitmux);
1032       gst_value_set_structure (value, splitmux->sink_properties);
1033       GST_OBJECT_UNLOCK (splitmux);
1034       break;
1035     case PROP_MUXERPAD_MAP:
1036       GST_SPLITMUX_LOCK (splitmux);
1037       gst_value_set_structure (value, splitmux->muxerpad_map);
1038       GST_SPLITMUX_UNLOCK (splitmux);
1039       break;
1040     default:
1041       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1042       break;
1043   }
1044 }
1045 
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
my_segment_to_running_time(GstSegment * segment,GstClockTime val)1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1049 {
1050   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1051 
1052   if (GST_CLOCK_TIME_IS_VALID (val)) {
1053     gboolean sign =
1054         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1055     if (sign > 0)
1056       res = val;
1057     else if (sign < 0)
1058       res = -val;
1059   }
1060   return res;
1061 }
1062 
1063 static void
mq_stream_ctx_reset(MqStreamCtx * ctx)1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1065 {
1066   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070   g_queue_clear (&ctx->queued_bufs);
1071 }
1072 
1073 static MqStreamCtx *
mq_stream_ctx_new(GstSplitMuxSink * splitmux)1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1075 {
1076   MqStreamCtx *ctx;
1077 
1078   ctx = g_new0 (MqStreamCtx, 1);
1079   ctx->splitmux = splitmux;
1080   g_queue_init (&ctx->queued_bufs);
1081   mq_stream_ctx_reset (ctx);
1082 
1083   return ctx;
1084 }
1085 
1086 static void
mq_stream_ctx_free(MqStreamCtx * ctx)1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1088 {
1089   if (ctx->q) {
1090     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1091 
1092     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1093 
1094     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095       gst_element_set_locked_state (ctx->q, TRUE);
1096       gst_element_set_state (ctx->q, GST_STATE_NULL);
1097       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098       gst_object_unref (parent);
1099     }
1100     gst_object_unref (ctx->q);
1101   }
1102   gst_object_unref (ctx->sinkpad);
1103   gst_object_unref (ctx->srcpad);
1104   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105   g_queue_clear (&ctx->queued_bufs);
1106   g_free (ctx);
1107 }
1108 
1109 static void
send_fragment_opened_closed_msg(GstSplitMuxSink * splitmux,gboolean opened,GstElement * sink)1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1111     GstElement * sink)
1112 {
1113   gchar *location = NULL;
1114   GstMessage *msg;
1115   const gchar *msg_name = opened ?
1116       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1118 
1119   if (!opened) {
1120     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1121     if (rtime)
1122       running_time = *rtime;
1123   }
1124 
1125   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126           "location") != NULL)
1127     g_object_get (sink, "location", &location, NULL);
1128 
1129   GST_DEBUG_OBJECT (splitmux,
1130       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1132 
1133   /* If it's in the middle of a teardown, the reference_ctc might have become
1134    * NULL */
1135   if (splitmux->reference_ctx) {
1136     msg = gst_message_new_element (GST_OBJECT (splitmux),
1137         gst_structure_new (msg_name,
1138             "location", G_TYPE_STRING, location,
1139             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140             "sink", GST_TYPE_ELEMENT, sink, NULL));
1141     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1142   }
1143 
1144   g_free (location);
1145 }
1146 
1147 static void
send_eos_async(GstSplitMuxSink * splitmux,AsyncEosHelper * helper)1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1149 {
1150   GstEvent *eos;
1151   GstPad *pad;
1152   MqStreamCtx *ctx;
1153 
1154   eos = gst_event_new_eos ();
1155   pad = helper->pad;
1156   ctx = helper->ctx;
1157 
1158   GST_SPLITMUX_LOCK (splitmux);
1159   if (!pad)
1160     pad = gst_pad_get_peer (ctx->srcpad);
1161   GST_SPLITMUX_UNLOCK (splitmux);
1162 
1163   gst_pad_send_event (pad, eos);
1164   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1165 
1166   gst_object_unref (pad);
1167   g_free (helper);
1168 }
1169 
1170 /* Called with lock held, drops the lock to send EOS to the
1171  * pad
1172  */
1173 static void
send_eos(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1175 {
1176   GstEvent *eos;
1177   GstPad *pad;
1178 
1179   eos = gst_event_new_eos ();
1180   pad = gst_pad_get_peer (ctx->srcpad);
1181 
1182   ctx->out_eos = TRUE;
1183 
1184   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185   GST_SPLITMUX_UNLOCK (splitmux);
1186   gst_pad_send_event (pad, eos);
1187   GST_SPLITMUX_LOCK (splitmux);
1188 
1189   gst_object_unref (pad);
1190 }
1191 
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193  * to happen in another thread */
1194 static void
eos_context_async(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1196 {
1197   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198   GstPad *srcpad, *sinkpad;
1199 
1200   srcpad = ctx->srcpad;
1201   sinkpad = gst_pad_get_peer (srcpad);
1202 
1203   helper->ctx = ctx;
1204   helper->pad = sinkpad;        /* Takes the reference */
1205 
1206   ctx->out_eos_async_done = TRUE;
1207 
1208   /* There used to be a bug here, where we had to explicitly remove
1209    * the SINK flag so that GstBin would ignore it for EOS purposes.
1210    * That fixed a race where if splitmuxsink really reaches EOS
1211    * before an asynchronous background element has finished, then
1212    * the bin wouldn't actually send EOS to the pipeline. Even after
1213    * finishing and removing the old element, the bin didn't re-check
1214    * EOS status on removing a SINK element. That bug was fixed
1215    * in core. */
1216   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1217       sinkpad, ctx);
1218 
1219   g_assert_nonnull (helper->pad);
1220   gst_element_call_async (GST_ELEMENT (splitmux),
1221       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1222 }
1223 
1224 /* Called with lock held. TRUE iff all contexts have a
1225  * pending (or delivered) async eos event */
1226 static gboolean
all_contexts_are_async_eos(GstSplitMuxSink * splitmux)1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1228 {
1229   gboolean ret = TRUE;
1230   GList *item;
1231 
1232   for (item = splitmux->contexts; item; item = item->next) {
1233     MqStreamCtx *ctx = item->data;
1234     ret &= ctx->out_eos_async_done;
1235   }
1236   return ret;
1237 }
1238 
1239 /* Called with splitmux lock held to check if this output
1240  * context needs to sleep to wait for the release of the
1241  * next GOP, or to send EOS to close out the current file
1242  */
1243 static GstFlowReturn
complete_or_wait_on_out(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1245 {
1246   if (ctx->caps_change)
1247     return GST_FLOW_OK;
1248 
1249   do {
1250     /* When first starting up, the reference stream has to output
1251      * the first buffer to prepare the muxer and sink */
1252     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1254 
1255     if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256         && my_max_out_running_time != G_MAXINT64) {
1257       my_max_out_running_time -= splitmux->alignment_threshold;
1258       GST_LOG_OBJECT (ctx->srcpad,
1259           "Max out running time currently %" GST_STIME_FORMAT
1260           ", with threshold applied it is %" GST_STIME_FORMAT,
1261           GST_STIME_ARGS (splitmux->max_out_running_time),
1262           GST_STIME_ARGS (my_max_out_running_time));
1263     }
1264 
1265     if (ctx->flushing
1266         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267       return GST_FLOW_FLUSHING;
1268 
1269     GST_LOG_OBJECT (ctx->srcpad,
1270         "Checking running time %" GST_STIME_FORMAT " against max %"
1271         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272         GST_STIME_ARGS (my_max_out_running_time));
1273 
1274     if (can_output) {
1275       if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276           ctx->out_running_time < my_max_out_running_time) {
1277         return GST_FLOW_OK;
1278       }
1279 
1280       switch (splitmux->output_state) {
1281         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282           /* We only get here if we've finished outputting a GOP and need to know
1283            * what to do next */
1284           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1286           continue;
1287 
1288         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290           /* We've reached the max out running_time to get here, so end this file now */
1291           if (ctx->out_eos == FALSE) {
1292             if (splitmux->async_finalize) {
1293               /* We must set EOS asynchronously at this point. We cannot defer
1294                * it, because we need all contexts to wake up, for the
1295                * reference context to eventually give us something at
1296                * START_NEXT_FILE. Otherwise, collectpads might choose another
1297                * context to give us the first buffer, and format-location-full
1298                * will not contain a valid sample. */
1299               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300                   GINT_TO_POINTER (1));
1301               eos_context_async (ctx, splitmux);
1302               if (all_contexts_are_async_eos (splitmux)) {
1303                 GST_INFO_OBJECT (splitmux,
1304                     "All contexts are async_eos. Moving to the next file.");
1305                 /* We can start the next file once we've asked each pad to go EOS */
1306                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1308                 continue;
1309               }
1310             } else {
1311               send_eos (splitmux, ctx);
1312               continue;
1313             }
1314           } else {
1315             GST_INFO_OBJECT (splitmux,
1316                 "At end-of-file state, but context %p is already EOS", ctx);
1317           }
1318           break;
1319         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320           if (ctx->is_reference) {
1321             GstFlowReturn ret = GST_FLOW_OK;
1322 
1323             /* Special handling on the reference ctx to start new fragments
1324              * and collect commands from the command queue */
1325             /* drops the splitmux lock briefly: */
1326             /* We must have reference ctx in order for format-location-full to
1327              * have a sample */
1328             ret = start_next_fragment (splitmux, ctx);
1329             if (ret != GST_FLOW_OK)
1330               return ret;
1331 
1332             continue;
1333           }
1334           break;
1335         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1336           do {
1337             SplitMuxOutputCommand *cmd =
1338                 g_queue_pop_tail (&splitmux->out_cmd_q);
1339             if (cmd != NULL) {
1340               /* If we pop the last command, we need to make our queues bigger */
1341               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342                 grow_blocked_queues (splitmux);
1343 
1344               if (cmd->start_new_fragment) {
1345                 if (splitmux->muxed_out_bytes > 0) {
1346                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1348                 } else {
1349                   GST_DEBUG_OBJECT (splitmux,
1350                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1351                 }
1352               } else {
1353                 GST_DEBUG_OBJECT (splitmux,
1354                     "Got new output cmd for time %" GST_STIME_FORMAT,
1355                     GST_STIME_ARGS (cmd->max_output_ts));
1356 
1357                 /* Extend the output range immediately */
1358                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359                     || cmd->max_output_ts > splitmux->max_out_running_time)
1360                   splitmux->max_out_running_time = cmd->max_output_ts;
1361                 GST_DEBUG_OBJECT (splitmux,
1362                     "Max out running time now %" GST_STIME_FORMAT,
1363                     GST_STIME_ARGS (splitmux->max_out_running_time));
1364                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1365               }
1366               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1367 
1368               out_cmd_buf_free (cmd);
1369               break;
1370             } else {
1371               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1372             }
1373           } while (!ctx->flushing && splitmux->output_state ==
1374               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375           /* loop and re-check the state */
1376           continue;
1377         }
1378         case SPLITMUX_OUTPUT_STATE_STOPPED:
1379           return GST_FLOW_FLUSHING;
1380       }
1381     } else {
1382       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1383     }
1384 
1385     GST_INFO_OBJECT (ctx->srcpad,
1386         "Sleeping for running time %"
1387         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388         GST_STIME_ARGS (ctx->out_running_time),
1389         GST_STIME_ARGS (splitmux->max_out_running_time));
1390     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391     GST_INFO_OBJECT (ctx->srcpad,
1392         "Woken for new max running time %" GST_STIME_FORMAT,
1393         GST_STIME_ARGS (splitmux->max_out_running_time));
1394   }
1395   while (1);
1396 
1397   return GST_FLOW_OK;
1398 }
1399 
1400 static GstClockTime
calculate_next_max_timecode(GstSplitMuxSink * splitmux,const GstVideoTimeCode * cur_tc,GstClockTime running_time,GstVideoTimeCode ** next_tc)1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403     GstVideoTimeCode ** next_tc)
1404 {
1405   GstVideoTimeCode *target_tc;
1406   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1407 
1408   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409     return GST_CLOCK_TIME_NONE;
1410 
1411   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1412   if (!target_tc) {
1413     GST_ELEMENT_ERROR (splitmux,
1414         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415     return GST_CLOCK_TIME_NONE;
1416   }
1417 
1418   /* Convert to ns */
1419   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1421 
1422   /* Add running_time, accounting for wraparound. */
1423   if (target_tc_time >= cur_tc_time) {
1424     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1425   } else {
1426     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1427 
1428     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429         (cur_tc->config.fps_d == 1001)) {
1430       /* Checking fps_d is probably unneeded, but better safe than sorry
1431        * (e.g. someone accidentally set a flag) */
1432       GstVideoTimeCode *tc_for_offset;
1433 
1434       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435        * but slightly less. Calculate that duration from a fake timecode. The
1436        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437        * is to add one frame to 23:59:59;29 */
1438       tc_for_offset =
1439           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440           NULL, cur_tc->config.flags, 23, 59, 59,
1441           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1442       day_in_ns =
1443           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445           cur_tc->config.fps_n);
1446       gst_video_time_code_free (tc_for_offset);
1447     }
1448     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1449   }
1450 
1451 #ifndef GST_DISABLE_GST_DEBUG
1452   {
1453     gchar *next_max_tc_str, *cur_tc_str;
1454 
1455     cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456     next_max_tc_str = gst_video_time_code_to_string (target_tc);
1457 
1458     GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459         " from ref timecode %s time: %" GST_TIME_FORMAT,
1460         next_max_tc_str,
1461         GST_TIME_ARGS (next_max_tc_time),
1462         cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1463 
1464     g_free (next_max_tc_str);
1465     g_free (cur_tc_str);
1466   }
1467 #endif
1468 
1469   if (next_tc)
1470     *next_tc = target_tc;
1471   else
1472     gst_video_time_code_free (target_tc);
1473 
1474   return next_max_tc_time;
1475 }
1476 
1477 static gboolean
request_next_keyframe(GstSplitMuxSink * splitmux,GstBuffer * buffer,GstClockTimeDiff running_time_dts)1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479     GstClockTimeDiff running_time_dts)
1480 {
1481   GstEvent *ev;
1482   GstClockTime target_time;
1483   gboolean timecode_based = FALSE;
1484   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488   InputGop *newest_gop = NULL;
1489   GList *l;
1490 
1491   if (!splitmux->send_keyframe_requests)
1492     return TRUE;
1493 
1494   /* Find the newest GOP where we passed in DTS the start PTS */
1495   for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496     InputGop *tmp = l->data;
1497 
1498     GST_TRACE_OBJECT (splitmux,
1499         "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500         " and start time %" GST_STIME_FORMAT,
1501         GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1502 
1503     if (tmp->sent_fku) {
1504       GST_DEBUG_OBJECT (splitmux,
1505           "Already checked for a keyframe request for this GOP");
1506       return TRUE;
1507     }
1508 
1509     if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510         tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511         running_time_dts >= tmp->start_time_pts) {
1512       GST_DEBUG_OBJECT (splitmux,
1513           "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514           GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515           GST_STIME_ARGS (tmp->start_time));
1516       newest_gop = tmp;
1517       break;
1518     }
1519   }
1520 
1521   if (!newest_gop) {
1522     GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1523     return TRUE;
1524   }
1525 
1526   if (splitmux->tc_interval) {
1527     if (newest_gop->start_tc
1528         && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529       GstVideoTimeCode *next_tc = NULL;
1530       max_tc_time =
1531           calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532           newest_gop->start_time, &next_tc);
1533 
1534       /* calculate the next expected keyframe time to prevent too early fku
1535        * event */
1536       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1537         next_max_tc_time =
1538             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1539       }
1540       if (next_tc)
1541         gst_video_time_code_free (next_tc);
1542 
1543       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1545 
1546       if (!timecode_based) {
1547         GST_WARNING_OBJECT (splitmux,
1548             "Couldn't calculate maximum fragment time for timecode mode");
1549       }
1550     } else {
1551       /* This can happen in the presence of GAP events that trigger
1552        * a new fragment start */
1553       GST_WARNING_OBJECT (splitmux,
1554           "No buffer available to calculate next timecode");
1555     }
1556   }
1557 
1558   if ((splitmux->threshold_time == 0 && !timecode_based)
1559       || splitmux->threshold_bytes != 0)
1560     return TRUE;
1561 
1562   if (timecode_based) {
1563     /* We might have rounding errors: aim slightly earlier */
1564     if (max_tc_time >= tc_rounding_error) {
1565       target_time = max_tc_time - tc_rounding_error;
1566     } else {
1567       /* unreliable target time */
1568       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569           " is smaller than allowed rounding error, set it to zero",
1570           GST_TIME_ARGS (max_tc_time));
1571       target_time = 0;
1572     }
1573 
1574     if (next_max_tc_time >= tc_rounding_error) {
1575       next_fku_time = next_max_tc_time - tc_rounding_error;
1576     } else {
1577       /* unreliable target time */
1578       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579           " is smaller than allowed rounding error, set it to zero",
1580           GST_TIME_ARGS (next_max_tc_time));
1581       next_fku_time = 0;
1582     }
1583   } else {
1584     target_time = newest_gop->start_time + splitmux->threshold_time;
1585   }
1586 
1587   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588     GstClockTime allowed_time = splitmux->next_fku_time;
1589 
1590     if (timecode_based) {
1591       if (allowed_time >= tc_rounding_error) {
1592         allowed_time -= tc_rounding_error;
1593       } else {
1594         /* unreliable next force key unit time */
1595         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1596             GST_TIME_FORMAT
1597             " is smaller than allowed rounding error, set it to zero",
1598             GST_TIME_ARGS (splitmux->next_fku_time));
1599         allowed_time = 0;
1600       }
1601     }
1602 
1603     if (target_time < allowed_time) {
1604       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607           GST_TIME_ARGS (target_time),
1608           GST_TIME_ARGS (splitmux->next_fku_time),
1609           GST_TIME_ARGS (allowed_time));
1610 
1611       return TRUE;
1612     } else if (allowed_time != splitmux->next_fku_time &&
1613         target_time < splitmux->next_fku_time) {
1614       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616           ", but the difference is smaller than allowed rounding error",
1617           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1618     }
1619   }
1620 
1621   if (!timecode_based) {
1622     next_fku_time = target_time + splitmux->threshold_time;
1623   }
1624 
1625   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626       ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1628 
1629   newest_gop->sent_fku = TRUE;
1630 
1631   splitmux->next_fku_time = next_fku_time;
1632   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1633 
1634   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1635 }
1636 
1637 static GstPadProbeReturn
handle_mq_output(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1639 {
1640   GstSplitMuxSink *splitmux = ctx->splitmux;
1641   MqStreamBuf *buf_info = NULL;
1642   GstFlowReturn ret = GST_FLOW_OK;
1643 
1644   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1645 
1646   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648     g_warning ("Buffer list handling not implemented");
1649     return GST_PAD_PROBE_DROP;
1650   }
1651   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653     GstEvent *event = gst_pad_probe_info_get_event (info);
1654     gboolean locked = FALSE, wait = !ctx->is_reference;
1655 
1656     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1657 
1658     switch (GST_EVENT_TYPE (event)) {
1659       case GST_EVENT_SEGMENT:
1660         gst_event_copy_segment (event, &ctx->out_segment);
1661         break;
1662       case GST_EVENT_FLUSH_STOP:
1663         GST_SPLITMUX_LOCK (splitmux);
1664         locked = TRUE;
1665         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667         g_queue_clear (&ctx->queued_bufs);
1668         g_queue_clear (&ctx->queued_bufs);
1669         /* If this is the reference context, we just threw away any queued keyframes */
1670         if (ctx->is_reference)
1671           splitmux->queued_keyframes = 0;
1672         ctx->flushing = FALSE;
1673         wait = FALSE;
1674         break;
1675       case GST_EVENT_FLUSH_START:
1676         GST_SPLITMUX_LOCK (splitmux);
1677         locked = TRUE;
1678         GST_LOG_OBJECT (pad, "Flush start");
1679         ctx->flushing = TRUE;
1680         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1682         break;
1683       case GST_EVENT_EOS:
1684         GST_SPLITMUX_LOCK (splitmux);
1685         locked = TRUE;
1686         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1687           goto beach;
1688         ctx->out_eos = TRUE;
1689 
1690         if (ctx == splitmux->reference_ctx) {
1691           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1693         }
1694 
1695         GST_INFO_OBJECT (splitmux,
1696             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1697         break;
1698       case GST_EVENT_GAP:{
1699         GstClockTime gap_ts;
1700         GstClockTimeDiff rtime;
1701 
1702         gst_event_parse_gap (event, &gap_ts, NULL);
1703         if (gap_ts == GST_CLOCK_TIME_NONE)
1704           break;
1705 
1706         GST_SPLITMUX_LOCK (splitmux);
1707         locked = TRUE;
1708 
1709         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1710           goto beach;
1711 
1712         /* When we get a gap event on the
1713          * reference stream and we're trying to open a
1714          * new file, we need to store it until we get
1715          * the buffer afterwards
1716          */
1717         if (ctx->is_reference &&
1718             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720           gst_event_replace (&ctx->pending_gap, event);
1721           GST_SPLITMUX_UNLOCK (splitmux);
1722           return GST_PAD_PROBE_HANDLED;
1723         }
1724 
1725         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1726 
1727         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728             GST_STIME_ARGS (rtime));
1729 
1730         if (rtime != GST_CLOCK_STIME_NONE) {
1731           ctx->out_running_time = rtime;
1732           complete_or_wait_on_out (splitmux, ctx);
1733         }
1734         break;
1735       }
1736       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737         const GstStructure *s;
1738         GstClockTimeDiff ts = 0;
1739 
1740         s = gst_event_get_structure (event);
1741         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1742           break;
1743 
1744         gst_structure_get_int64 (s, "timestamp", &ts);
1745 
1746         GST_SPLITMUX_LOCK (splitmux);
1747         locked = TRUE;
1748 
1749         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1750           goto beach;
1751         ctx->out_running_time = ts;
1752         if (!ctx->is_reference)
1753           ret = complete_or_wait_on_out (splitmux, ctx);
1754         GST_SPLITMUX_UNLOCK (splitmux);
1755         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756         return GST_PAD_PROBE_DROP;
1757       }
1758       case GST_EVENT_CAPS:{
1759         GstPad *peer;
1760 
1761         if (!ctx->is_reference)
1762           break;
1763 
1764         peer = gst_pad_get_peer (pad);
1765         if (peer) {
1766           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1767 
1768           gst_object_unref (peer);
1769 
1770           if (ok)
1771             break;
1772 
1773         } else {
1774           break;
1775         }
1776         /* This is in the case the muxer doesn't allow this change of caps */
1777         GST_SPLITMUX_LOCK (splitmux);
1778         locked = TRUE;
1779         ctx->caps_change = TRUE;
1780 
1781         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782           GST_DEBUG_OBJECT (splitmux,
1783               "New caps were not accepted. Switching output file");
1784           if (ctx->out_eos == FALSE) {
1785             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1787           }
1788         }
1789 
1790         /* Lets it fall through, if it fails again, then the muxer just can't
1791          * support this format, but at least we have a closed file.
1792          */
1793         break;
1794       }
1795       default:
1796         break;
1797     }
1798 
1799     /* We need to make sure events aren't passed
1800      * until the muxer / sink are ready for it */
1801     if (!locked)
1802       GST_SPLITMUX_LOCK (splitmux);
1803     if (wait)
1804       ret = complete_or_wait_on_out (splitmux, ctx);
1805     GST_SPLITMUX_UNLOCK (splitmux);
1806 
1807     /* Don't try to forward sticky events before the next buffer is there
1808      * because it would cause a new file to be created without the first
1809      * buffer being available.
1810      */
1811     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813       gst_event_unref (event);
1814       return GST_PAD_PROBE_HANDLED;
1815     } else {
1816       return GST_PAD_PROBE_PASS;
1817     }
1818   }
1819 
1820   /* Allow everything through until the configured next stopping point */
1821   GST_SPLITMUX_LOCK (splitmux);
1822 
1823   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824   if (buf_info == NULL) {
1825     /* Can only happen due to a poorly timed flush */
1826     ret = GST_FLOW_FLUSHING;
1827     goto beach;
1828   }
1829 
1830   /* If we have popped a keyframe, decrement the queued_gop count */
1831   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832     splitmux->queued_keyframes--;
1833 
1834   ctx->out_running_time = buf_info->run_ts;
1835   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1836 
1837   GST_LOG_OBJECT (splitmux,
1838       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839       " size %" G_GUINT64_FORMAT,
1840       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1841 
1842   ctx->caps_change = FALSE;
1843 
1844   ret = complete_or_wait_on_out (splitmux, ctx);
1845 
1846   splitmux->muxed_out_bytes += buf_info->buf_size;
1847 
1848 #ifndef GST_DISABLE_GST_DEBUG
1849   {
1850     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852         " run ts %" GST_STIME_FORMAT, buf,
1853         GST_STIME_ARGS (ctx->out_running_time));
1854   }
1855 #endif
1856 
1857   ctx->cur_out_buffer = NULL;
1858   GST_SPLITMUX_UNLOCK (splitmux);
1859 
1860   /* pending_gap is protected by the STREAM lock */
1861   if (ctx->pending_gap) {
1862     /* If we previously stored a gap event, send it now */
1863     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1864 
1865     GST_DEBUG_OBJECT (splitmux,
1866         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1867 
1868     gst_pad_send_event (peer, ctx->pending_gap);
1869     ctx->pending_gap = NULL;
1870 
1871     gst_object_unref (peer);
1872   }
1873 
1874   mq_stream_buf_free (buf_info);
1875 
1876   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877   return GST_PAD_PROBE_PASS;
1878 
1879 beach:
1880   GST_SPLITMUX_UNLOCK (splitmux);
1881   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882   return GST_PAD_PROBE_DROP;
1883 }
1884 
1885 static gboolean
resend_sticky(GstPad * pad,GstEvent ** event,GstPad * peer)1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1887 {
1888   return gst_pad_send_event (peer, gst_event_ref (*event));
1889 }
1890 
1891 static void
unlock_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1893 {
1894   if (ctx->fragment_block_id > 0) {
1895     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896     ctx->fragment_block_id = 0;
1897   }
1898 }
1899 
1900 static void
restart_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1902 {
1903   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1904 
1905   gst_pad_sticky_events_foreach (ctx->srcpad,
1906       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1907 
1908   /* Clear EOS flag if not actually EOS */
1909   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910   ctx->out_eos_async_done = ctx->out_eos;
1911 
1912   gst_object_unref (peer);
1913 }
1914 
1915 static void
relink_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1917 {
1918   GstPad *sinkpad, *srcpad, *newpad;
1919   GstPadTemplate *templ;
1920 
1921   srcpad = ctx->srcpad;
1922   sinkpad = gst_pad_get_peer (srcpad);
1923 
1924   templ = sinkpad->padtemplate;
1925   newpad =
1926       gst_element_request_pad (splitmux->muxer, templ,
1927       GST_PAD_NAME (sinkpad), NULL);
1928 
1929   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1930       newpad);
1931   if (!gst_pad_unlink (srcpad, sinkpad)) {
1932     gst_object_unref (sinkpad);
1933     goto fail;
1934   }
1935   if (gst_pad_link_full (srcpad, newpad,
1936           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937     gst_element_release_request_pad (splitmux->muxer, newpad);
1938     gst_object_unref (sinkpad);
1939     gst_object_unref (newpad);
1940     goto fail;
1941   }
1942   gst_object_unref (newpad);
1943   gst_object_unref (sinkpad);
1944   return;
1945 
1946 fail:
1947   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948       ("Could not create the new muxer/sink"), NULL);
1949 }
1950 
1951 static GstPadProbeReturn
_block_pad(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1953 {
1954   return GST_PAD_PROBE_OK;
1955 }
1956 
1957 static void
block_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1959 {
1960   ctx->fragment_block_id =
1961       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1962       NULL, NULL);
1963 }
1964 
1965 static gboolean
_set_property_from_structure(GQuark field_id,const GValue * value,gpointer user_data)1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1967     gpointer user_data)
1968 {
1969   const gchar *property_name = g_quark_to_string (field_id);
1970   GObject *element = G_OBJECT (user_data);
1971 
1972   g_object_set_property (element, property_name, value);
1973 
1974   return TRUE;
1975 }
1976 
1977 static void
_lock_and_set_to_null(GstElement * element,GstSplitMuxSink * splitmux)1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1979 {
1980   gst_element_set_locked_state (element, TRUE);
1981   gst_element_set_state (element, GST_STATE_NULL);
1982   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983   gst_bin_remove (GST_BIN (splitmux), element);
1984 }
1985 
1986 
1987 static void
_send_event(const GValue * value,gpointer user_data)1988 _send_event (const GValue * value, gpointer user_data)
1989 {
1990   GstPad *pad = g_value_get_object (value);
1991   GstEvent *ev = user_data;
1992 
1993   gst_pad_send_event (pad, gst_event_ref (ev));
1994 }
1995 
1996 /* Called with lock held when a fragment
1997  * reaches EOS and it is time to restart
1998  * a new fragment
1999  */
2000 static GstFlowReturn
start_next_fragment(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2002 {
2003   GstElement *muxer, *sink;
2004 
2005   g_assert (ctx->is_reference);
2006 
2007   /* 1 change to new file */
2008   splitmux->switching_fragment = TRUE;
2009 
2010   /* We need to drop the splitmux lock to acquire the state lock
2011    * here and ensure there's no racy state change going on elsewhere */
2012   muxer = gst_object_ref (splitmux->muxer);
2013   sink = gst_object_ref (splitmux->active_sink);
2014 
2015   GST_SPLITMUX_UNLOCK (splitmux);
2016   GST_SPLITMUX_STATE_LOCK (splitmux);
2017 
2018   if (splitmux->shutdown) {
2019     GST_DEBUG_OBJECT (splitmux,
2020         "Shutdown requested. Aborting fragment switch.");
2021     GST_SPLITMUX_LOCK (splitmux);
2022     GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023     gst_object_unref (muxer);
2024     gst_object_unref (sink);
2025     return GST_FLOW_FLUSHING;
2026   }
2027 
2028   if (splitmux->async_finalize) {
2029     if (splitmux->muxed_out_bytes > 0
2030         || splitmux->fragment_id != splitmux->start_index) {
2031       gchar *newname;
2032       GstElement *new_sink, *new_muxer;
2033 
2034       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035           splitmux->fragment_id);
2036       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038       GST_SPLITMUX_LOCK (splitmux);
2039       if ((splitmux->sink =
2040               create_element (splitmux, splitmux->sink_factory, newname,
2041                   TRUE)) == NULL)
2042         goto fail;
2043       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044         gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045             splitmux->sink_preset);
2046       if (splitmux->sink_properties)
2047         gst_structure_foreach (splitmux->sink_properties,
2048             _set_property_from_structure, splitmux->sink);
2049       splitmux->active_sink = splitmux->sink;
2050       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2051       g_free (newname);
2052       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053       if ((splitmux->muxer =
2054               create_element (splitmux, splitmux->muxer_factory, newname,
2055                   TRUE)) == NULL)
2056         goto fail;
2057       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2058               "async") != NULL) {
2059         /* async child elements are causing state change races and weird
2060          * failures, so let's try and turn that off */
2061         g_object_set (splitmux->sink, "async", FALSE, NULL);
2062       }
2063       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065             splitmux->muxer_preset);
2066       if (splitmux->muxer_properties)
2067         gst_structure_foreach (splitmux->muxer_properties,
2068             _set_property_from_structure, splitmux->muxer);
2069       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2070       g_free (newname);
2071       new_sink = splitmux->sink;
2072       new_muxer = splitmux->muxer;
2073       GST_SPLITMUX_UNLOCK (splitmux);
2074       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075       gst_element_link (new_muxer, new_sink);
2076 
2077       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079                     EOS_FROM_US)) == 2) {
2080           _lock_and_set_to_null (muxer, splitmux);
2081           _lock_and_set_to_null (sink, splitmux);
2082         } else {
2083           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084               GINT_TO_POINTER (2));
2085         }
2086       }
2087       gst_object_unref (muxer);
2088       gst_object_unref (sink);
2089       muxer = new_muxer;
2090       sink = new_sink;
2091       gst_object_ref (muxer);
2092       gst_object_ref (sink);
2093     }
2094   } else {
2095 
2096     gst_element_set_locked_state (muxer, TRUE);
2097     gst_element_set_locked_state (sink, TRUE);
2098     gst_element_set_state (sink, GST_STATE_NULL);
2099 
2100     if (splitmux->reset_muxer) {
2101       gst_element_set_state (muxer, GST_STATE_NULL);
2102     } else {
2103       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2104       GstEvent *ev;
2105       guint32 seqnum;
2106 
2107       ev = gst_event_new_flush_start ();
2108       seqnum = gst_event_get_seqnum (ev);
2109       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110       gst_event_unref (ev);
2111 
2112       gst_iterator_resync (it);
2113 
2114       ev = gst_event_new_flush_stop (TRUE);
2115       gst_event_set_seqnum (ev, seqnum);
2116       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117       gst_event_unref (ev);
2118 
2119       gst_iterator_free (it);
2120     }
2121   }
2122 
2123   GST_SPLITMUX_LOCK (splitmux);
2124   set_next_filename (splitmux, ctx);
2125   splitmux->muxed_out_bytes = 0;
2126   GST_SPLITMUX_UNLOCK (splitmux);
2127 
2128   if (gst_element_set_state (sink,
2129           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130     gst_element_set_state (sink, GST_STATE_NULL);
2131     gst_element_set_locked_state (muxer, FALSE);
2132     gst_element_set_locked_state (sink, FALSE);
2133 
2134     goto fail_output;
2135   }
2136 
2137   if (gst_element_set_state (muxer,
2138           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139     gst_element_set_state (muxer, GST_STATE_NULL);
2140     gst_element_set_state (sink, GST_STATE_NULL);
2141     gst_element_set_locked_state (muxer, FALSE);
2142     gst_element_set_locked_state (sink, FALSE);
2143     goto fail_muxer;
2144   }
2145 
2146   gst_element_set_locked_state (muxer, FALSE);
2147   gst_element_set_locked_state (sink, FALSE);
2148 
2149   gst_object_unref (sink);
2150   gst_object_unref (muxer);
2151 
2152   GST_SPLITMUX_LOCK (splitmux);
2153   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154   splitmux->switching_fragment = FALSE;
2155   do_async_done (splitmux);
2156 
2157   splitmux->ready_for_output = TRUE;
2158 
2159   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2161 
2162   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2163 
2164   /* FIXME: Is this always the correct next state? */
2165   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2168   return GST_FLOW_OK;
2169 
2170 fail:
2171   gst_object_unref (sink);
2172   gst_object_unref (muxer);
2173 
2174   GST_SPLITMUX_LOCK (splitmux);
2175   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177       ("Could not create the new muxer/sink"), NULL);
2178   return GST_FLOW_ERROR;
2179 
2180 fail_output:
2181   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182       ("Could not start new output sink"), NULL);
2183   gst_object_unref (sink);
2184   gst_object_unref (muxer);
2185 
2186   GST_SPLITMUX_LOCK (splitmux);
2187   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188   splitmux->switching_fragment = FALSE;
2189   return GST_FLOW_ERROR;
2190 
2191 fail_muxer:
2192   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193       ("Could not start new muxer"), NULL);
2194   gst_object_unref (sink);
2195   gst_object_unref (muxer);
2196 
2197   GST_SPLITMUX_LOCK (splitmux);
2198   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199   splitmux->switching_fragment = FALSE;
2200   return GST_FLOW_ERROR;
2201 }
2202 
2203 static void
bus_handler(GstBin * bin,GstMessage * message)2204 bus_handler (GstBin * bin, GstMessage * message)
2205 {
2206   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2207 
2208   switch (GST_MESSAGE_TYPE (message)) {
2209     case GST_MESSAGE_EOS:{
2210       /* If the state is draining out the current file, drop this EOS */
2211       GstElement *sink;
2212 
2213       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214       GST_SPLITMUX_LOCK (splitmux);
2215 
2216       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2217 
2218       if (splitmux->async_finalize) {
2219 
2220         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222                       EOS_FROM_US)) == 2) {
2223             GstElement *muxer;
2224             GstPad *sinksink, *muxersrc;
2225 
2226             sinksink = gst_element_get_static_pad (sink, "sink");
2227             muxersrc = gst_pad_get_peer (sinksink);
2228             muxer = gst_pad_get_parent_element (muxersrc);
2229             gst_object_unref (sinksink);
2230             gst_object_unref (muxersrc);
2231 
2232             gst_element_call_async (muxer,
2233                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234                 gst_object_ref (splitmux), gst_object_unref);
2235             gst_element_call_async (sink,
2236                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237                 gst_object_ref (splitmux), gst_object_unref);
2238             gst_object_unref (muxer);
2239           } else {
2240             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241                 GINT_TO_POINTER (2));
2242           }
2243           GST_DEBUG_OBJECT (splitmux,
2244               "Caught async EOS from previous muxer+sink. Dropping.");
2245           /* We forward the EOS so that it gets aggregated as normal. If the sink
2246            * finishes and is removed before the end, it will be de-aggregated */
2247           gst_message_unref (message);
2248           GST_SPLITMUX_UNLOCK (splitmux);
2249           return;
2250         }
2251       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252         GST_DEBUG_OBJECT (splitmux,
2253             "Passing EOS message. Output state %d max_out_running_time %"
2254             GST_STIME_FORMAT, splitmux->output_state,
2255             GST_STIME_ARGS (splitmux->max_out_running_time));
2256       } else {
2257         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2260 
2261         gst_message_unref (message);
2262         GST_SPLITMUX_UNLOCK (splitmux);
2263         return;
2264       }
2265       GST_SPLITMUX_UNLOCK (splitmux);
2266       break;
2267     }
2268     case GST_MESSAGE_ASYNC_START:
2269     case GST_MESSAGE_ASYNC_DONE:
2270       /* Ignore state changes from our children while switching */
2271       GST_SPLITMUX_LOCK (splitmux);
2272       if (splitmux->switching_fragment) {
2273         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275           GST_LOG_OBJECT (splitmux,
2276               "Ignoring state change from child %" GST_PTR_FORMAT
2277               " while switching", GST_MESSAGE_SRC (message));
2278           gst_message_unref (message);
2279           GST_SPLITMUX_UNLOCK (splitmux);
2280           return;
2281         }
2282       }
2283       GST_SPLITMUX_UNLOCK (splitmux);
2284       break;
2285     case GST_MESSAGE_WARNING:
2286     {
2287       GError *gerror = NULL;
2288 
2289       gst_message_parse_warning (message, &gerror, NULL);
2290 
2291       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2292         GList *item;
2293         gboolean caps_change = FALSE;
2294 
2295         GST_SPLITMUX_LOCK (splitmux);
2296 
2297         for (item = splitmux->contexts; item; item = item->next) {
2298           MqStreamCtx *ctx = item->data;
2299 
2300           if (ctx->caps_change) {
2301             caps_change = TRUE;
2302             break;
2303           }
2304         }
2305 
2306         GST_SPLITMUX_UNLOCK (splitmux);
2307 
2308         if (caps_change) {
2309           GST_LOG_OBJECT (splitmux,
2310               "Ignoring warning change from child %" GST_PTR_FORMAT
2311               " while switching caps", GST_MESSAGE_SRC (message));
2312           gst_message_unref (message);
2313           return;
2314         }
2315       }
2316       break;
2317     }
2318     default:
2319       break;
2320   }
2321 
2322   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2323 }
2324 
2325 static void
ctx_set_unblock(MqStreamCtx * ctx)2326 ctx_set_unblock (MqStreamCtx * ctx)
2327 {
2328   ctx->need_unblock = TRUE;
2329 }
2330 
2331 static gboolean
need_new_fragment(GstSplitMuxSink * splitmux,GstClockTime queued_time,GstClockTime queued_gop_time,guint64 queued_bytes)2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333     GstClockTime queued_time, GstClockTime queued_gop_time,
2334     guint64 queued_bytes)
2335 {
2336   guint64 thresh_bytes;
2337   GstClockTime thresh_time;
2338   gboolean check_robust_muxing;
2339   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340   GstClockTime *ptr_to_time;
2341   const InputGop *gop, *next_gop;
2342 
2343   GST_OBJECT_LOCK (splitmux);
2344   thresh_bytes = splitmux->threshold_bytes;
2345   thresh_time = splitmux->threshold_time;
2346   ptr_to_time = (GstClockTime *)
2347       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2348   if (ptr_to_time)
2349     time_to_split = *ptr_to_time;
2350   check_robust_muxing = splitmux->use_robust_muxing
2351       && splitmux->muxer_has_reserved_props;
2352   GST_OBJECT_UNLOCK (splitmux);
2353 
2354   /* Have we muxed at least one thing from the reference
2355    * stream into the file? If not, no other streams can have
2356    * either */
2357   if (splitmux->fragment_reference_bytes <= 0) {
2358     GST_TRACE_OBJECT (splitmux,
2359         "Not ready to split - nothing muxed on the reference stream");
2360     return FALSE;
2361   }
2362 
2363   /* User told us to split now */
2364   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2366     return TRUE;
2367   }
2368 
2369   gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370   /* We need a full GOP queued up at this point */
2371   g_assert (gop != NULL);
2372   next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373   /* And the beginning of the next GOP or otherwise EOS */
2374 
2375   /* User told us to split at this running time */
2376   if (gop->start_time >= time_to_split) {
2377     GST_OBJECT_LOCK (splitmux);
2378     /* Dequeue running time */
2379     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380     /* Empty any running times after this that are past now */
2381     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382     while (ptr_to_time) {
2383       time_to_split = *ptr_to_time;
2384       if (gop->start_time < time_to_split) {
2385         break;
2386       }
2387       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2389     }
2390     GST_TRACE_OBJECT (splitmux,
2391         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392         GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393         GST_STIME_ARGS (time_to_split));
2394     GST_OBJECT_UNLOCK (splitmux);
2395     return TRUE;
2396   }
2397 
2398   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399     GST_TRACE_OBJECT (splitmux,
2400         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401     return TRUE;                /* Would overrun byte limit */
2402   }
2403 
2404   if (thresh_time > 0 && queued_time > thresh_time) {
2405     GST_TRACE_OBJECT (splitmux,
2406         "queued time %" GST_STIME_FORMAT " overruns time limit",
2407         GST_STIME_ARGS (queued_time));
2408     return TRUE;                /* Would overrun time limit */
2409   }
2410 
2411   if (splitmux->tc_interval) {
2412     GstClockTime next_gop_start_time =
2413         next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2414 
2415     if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416         GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417         next_gop_start_time >
2418         splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419       GST_TRACE_OBJECT (splitmux,
2420           "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421           GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422           GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2423       return TRUE;
2424     }
2425   }
2426 
2427   if (check_robust_muxing) {
2428     GstClockTime mux_reserved_remain;
2429 
2430     g_object_get (splitmux->muxer,
2431         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2432 
2433     GST_LOG_OBJECT (splitmux,
2434         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436         mux_reserved_remain, queued_gop_time);
2437 
2438     if (queued_gop_time >= mux_reserved_remain) {
2439       GST_INFO_OBJECT (splitmux,
2440           "File is about to run out of header room - %" G_GUINT64_FORMAT
2441           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2443       return TRUE;
2444     }
2445   }
2446 
2447   /* Continue and mux this GOP */
2448   return FALSE;
2449 }
2450 
2451 /* probably we want to add this API? */
2452 static void
video_time_code_replace(GstVideoTimeCode ** old_tc,GstVideoTimeCode * new_tc)2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2454 {
2455   GstVideoTimeCode *timecode = NULL;
2456 
2457   g_return_if_fail (old_tc != NULL);
2458 
2459   if (*old_tc == new_tc)
2460     return;
2461 
2462   if (new_tc)
2463     timecode = gst_video_time_code_copy (new_tc);
2464 
2465   if (*old_tc)
2466     gst_video_time_code_free (*old_tc);
2467 
2468   *old_tc = timecode;
2469 }
2470 
2471 #ifdef OHOS_OPT_COMPAT
2472 // ohos.opt.compat.0039
2473 // Ignore the error and continue to execute, waiting for EOS
continue_execution(GstSplitMuxSink * splitmux,GstClockTimeDiff max_out_running_time)2474 static void continue_execution (GstSplitMuxSink * splitmux, GstClockTimeDiff max_out_running_time)
2475 {
2476   SplitMuxOutputCommand *cmd;
2477 
2478   if (splitmux) {
2479     // Make handle_mq_Input is not blocked
2480     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2481     // Wake up handle_mq_output
2482     cmd = out_cmd_buf_new ();
2483     cmd->start_new_fragment = FALSE;
2484     cmd->max_output_ts = max_out_running_time;
2485     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2486     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2487   }
2488 
2489   return;
2490 }
2491 #endif
2492 
2493 /* Called with splitmux lock held */
2494 /* Called when entering ProcessingCompleteGop state
2495  * Assess if mq contents overflowed the current file
2496  *   -> If yes, need to switch to new file
2497  *   -> if no, set max_out_running_time to let this GOP in and
2498  *      go to COLLECTING_GOP_START state
2499  */
2500 static void
handle_gathered_gop(GstSplitMuxSink * splitmux,const InputGop * gop,GstClockTimeDiff next_gop_start_time,GstClockTimeDiff max_out_running_time)2501 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2502     GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2503 {
2504   guint64 queued_bytes;
2505   GstClockTimeDiff queued_time = 0;
2506   GstClockTimeDiff queued_gop_time = 0;
2507   SplitMuxOutputCommand *cmd;
2508 
2509   /* Assess if the multiqueue contents overflowed the current file */
2510   /* When considering if a newly gathered GOP overflows
2511    * the time limit for the file, only consider the running time of the
2512    * reference stream. Other streams might have run ahead a little bit,
2513    * but extra pieces won't be released to the muxer beyond the reference
2514    * stream cut-off anyway - so it forms the limit. */
2515   queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2516   queued_time = next_gop_start_time;
2517   /* queued_gop_time tracks how much unwritten data there is waiting to
2518    * be written to this fragment including this GOP */
2519   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2520     queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2521   else
2522     queued_gop_time = queued_time - gop->start_time;
2523 
2524   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2525   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2526       " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2527       " gop start time %" GST_STIME_FORMAT,
2528       GST_STIME_ARGS (queued_time), queued_bytes,
2529       GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2530 
2531   if (queued_gop_time < 0)
2532     goto error_gop_duration;
2533 
2534   if (queued_time < splitmux->fragment_start_time)
2535     goto error_queued_time;
2536 
2537   queued_time -= splitmux->fragment_start_time;
2538   if (queued_time < queued_gop_time)
2539     queued_gop_time = queued_time;
2540 
2541   /* Expand queued bytes estimate by muxer overhead */
2542   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2543 
2544   /* Check for overrun - have we output at least one byte and overrun
2545    * either threshold? */
2546   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2547     if (splitmux->async_finalize) {
2548       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2549       *sink_running_time = splitmux->reference_ctx->out_running_time;
2550       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2551           RUNNING_TIME, sink_running_time, g_free);
2552     }
2553     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2554     /* Tell the output side to start a new fragment */
2555     GST_INFO_OBJECT (splitmux,
2556         "This GOP (dur %" GST_STIME_FORMAT
2557         ") would overflow the fragment, Sending start_new_fragment cmd",
2558         GST_STIME_ARGS (queued_gop_time));
2559     cmd = out_cmd_buf_new ();
2560     cmd->start_new_fragment = TRUE;
2561     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2562     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2563 
2564     splitmux->fragment_start_time = gop->start_time;
2565     splitmux->fragment_start_time_pts = gop->start_time_pts;
2566     splitmux->fragment_total_bytes = 0;
2567     splitmux->fragment_reference_bytes = 0;
2568 
2569     video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2570     splitmux->next_fragment_start_tc_time =
2571         calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2572         splitmux->fragment_start_time, NULL);
2573     if (splitmux->tc_interval && splitmux->fragment_start_tc
2574         && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2575       GST_WARNING_OBJECT (splitmux,
2576           "Couldn't calculate next fragment start time for timecode mode");
2577     }
2578   }
2579 
2580   /* And set up to collect the next GOP */
2581   if (max_out_running_time != G_MAXINT64) {
2582     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2583   } else {
2584     /* This is probably already the current state, but just in case: */
2585     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2586   }
2587 
2588   /* And wake all input contexts to send a wake-up event */
2589   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2590   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2591 
2592   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2593   splitmux->fragment_total_bytes += gop->total_bytes;
2594   splitmux->fragment_reference_bytes += gop->reference_bytes;
2595 
2596   if (gop->total_bytes > 0) {
2597     GST_LOG_OBJECT (splitmux,
2598         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2599         " time %" GST_STIME_FORMAT,
2600         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2601 
2602     /* Send this GOP to the output command queue */
2603     cmd = out_cmd_buf_new ();
2604     cmd->start_new_fragment = FALSE;
2605     cmd->max_output_ts = max_out_running_time;
2606     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2607         GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2608     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2609 
2610     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2611   }
2612 
2613   return;
2614 
2615 error_gop_duration:
2616 #ifdef OHOS_OPT_COMPAT
2617 // ohos.opt.compat.0039
2618   continue_execution(splitmux, max_out_running_time);
2619 #endif
2620   GST_ELEMENT_ERROR (splitmux,
2621       STREAM, FAILED, ("Timestamping error on input streams"),
2622       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2623           GST_STIME_ARGS (queued_gop_time)));
2624   return;
2625 error_queued_time:
2626 #ifdef OHOS_OPT_COMPAT
2627 // ohos.opt.compat.0039
2628   continue_execution(splitmux, max_out_running_time);
2629 #endif
2630   GST_ELEMENT_ERROR (splitmux,
2631       STREAM, FAILED, ("Timestamping error on input streams"),
2632       ("Queued time is negative. Input went backwards. queued_time - %"
2633           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2634   return;
2635 }
2636 
2637 /* Called with splitmux lock held */
2638 /* Called from each input pad when it is has all the pieces
2639  * for a GOP or EOS, starting with the reference pad which has set the
2640  * splitmux->max_in_running_time
2641  */
2642 static void
check_completed_gop(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)2643 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2644 {
2645   GList *cur;
2646   GstEvent *event;
2647 
2648   /* On ENDING_FILE, the reference stream sends a command to start a new
2649    * fragment, then releases the GOP for output in the new fragment.
2650    *  If some streams received no buffer during the last GOP that overran,
2651    * because its next buffer has a timestamp bigger than
2652    * ctx->max_in_running_time, its queue is empty. In that case the only
2653    * way to wakeup the output thread is by injecting an event in the
2654    * queue. This usually happen with subtitle streams.
2655    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2656   if (ctx->need_unblock) {
2657     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2658     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2659         GST_EVENT_TYPE_SERIALIZED,
2660         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2661             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2662 
2663     GST_SPLITMUX_UNLOCK (splitmux);
2664     gst_pad_send_event (ctx->sinkpad, event);
2665     GST_SPLITMUX_LOCK (splitmux);
2666 
2667     ctx->need_unblock = FALSE;
2668     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2669     /* state may have changed while we were unlocked. Loop again if so */
2670     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2671       return;
2672   }
2673 
2674   do {
2675     GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2676 
2677     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2678       GstClockTimeDiff max_out_running_time;
2679       gboolean ready = TRUE;
2680       InputGop *gop;
2681       const InputGop *next_gop;
2682 
2683       gop = g_queue_peek_head (&splitmux->pending_input_gops);
2684       next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2685 
2686 #ifdef OHOS_OPT_COMPAT
2687       /* ohos.opt.stable.0045
2688        * When the plug-in does not switch to the playing state, there is no data passing by.
2689        * At this time, it receives the eos signal, and the gop and gops are null, resulting in an assertion error.
2690        * Set input_state to COLLECTING_GOP_START, put non-reference data source into sleep and stop receiving data */
2691       if ((gop == NULL) && (next_gop == NULL)) {
2692           GST_WARNING_OBJECT (splitmux, "Received eos signal. No further GOPs finished collecting");
2693           splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2694           return;
2695       }
2696 #endif
2697 
2698       /* If we have no GOP or no next GOP here then the reference context is
2699        * at EOS, otherwise use the start time of the next GOP if we're far
2700        * enough in the GOP to know it */
2701       if (gop && next_gop) {
2702         if (!splitmux->reference_ctx->in_eos
2703             && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2704             && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2705           GST_LOG_OBJECT (splitmux,
2706               "No further GOPs finished collecting, waiting until current DTS %"
2707               GST_STIME_FORMAT " has passed next GOP start PTS %"
2708               GST_STIME_FORMAT,
2709               GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2710               GST_STIME_ARGS (next_gop->start_time_pts));
2711           break;
2712         }
2713 
2714         GST_LOG_OBJECT (splitmux,
2715             "Finished collecting GOP with start time %" GST_STIME_FORMAT
2716             ", next GOP start time %" GST_STIME_FORMAT,
2717             GST_STIME_ARGS (gop->start_time),
2718             GST_STIME_ARGS (next_gop->start_time));
2719         next_gop_start = next_gop->start_time;
2720         max_out_running_time =
2721             splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2722       } else if (!next_gop) {
2723         GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2724         next_gop_start = splitmux->max_in_running_time;
2725         max_out_running_time = G_MAXINT64;
2726       } else if (!gop) {
2727         GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2728         break;
2729       } else {
2730         g_assert_not_reached ();
2731       }
2732 
2733       g_assert (gop != NULL);
2734 
2735       /* Iterate each pad, and check that the input running time is at least
2736        * up to the start running time of the next GOP or EOS, and if so handle
2737        * the collected GOP */
2738       GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2739           GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2740       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2741           cur = g_list_next (cur)) {
2742         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2743 
2744         GST_LOG_OBJECT (splitmux,
2745             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2746             " EOS %d", tmpctx, tmpctx->sinkpad,
2747             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2748 
2749         if (next_gop_start != GST_CLOCK_STIME_NONE &&
2750             tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2751 #ifdef OHOS_OPT_COMPAT
2752 // ohos.opt.compat.0042
2753 // Prevent the gap between video pts and audio pts from being too large, which will lead to waiting all the time
2754             GstClockTimeDiff diff = next_gop_start - tmpctx->in_running_time;
2755             if (diff >= 2 * GST_SECOND) { // Error reported for more than 2s
2756                 GST_ELEMENT_ERROR (splitmux, STREAM, FAILED, ("Timestamping error on input streams"),
2757                     ("Context %p sink pad %" GST_PTR_FORMAT " running time %" GST_STIME_FORMAT
2758                     ", next gop start: %" GST_STIME_FORMAT ", diff %" GST_STIME_FORMAT,
2759                     tmpctx, tmpctx->sinkpad, GST_STIME_ARGS (tmpctx->in_running_time),
2760                     GST_STIME_ARGS (next_gop_start), GST_STIME_ARGS (diff)));
2761             } else if (diff >= 500 * GST_MSECOND) { // Record error log for more than 500ms
2762                 GST_ERROR_OBJECT (splitmux, "Context %p sink pad %" GST_PTR_FORMAT " running time %" GST_STIME_FORMAT
2763                     ", next gop start: %" GST_STIME_FORMAT ", diff %" GST_STIME_FORMAT,
2764                     tmpctx, tmpctx->sinkpad, GST_STIME_ARGS (tmpctx->in_running_time),
2765                     GST_STIME_ARGS (next_gop_start), GST_STIME_ARGS (diff));
2766             }
2767 #endif
2768           GST_LOG_OBJECT (splitmux,
2769               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2770               tmpctx, tmpctx->sinkpad);
2771           ready = FALSE;
2772           break;
2773         }
2774       }
2775       if (ready) {
2776         GST_DEBUG_OBJECT (splitmux,
2777             "Collected GOP is complete. Processing (ctx %p)", ctx);
2778         /* All pads have a complete GOP, release it into the multiqueue */
2779         handle_gathered_gop (splitmux, gop, next_gop_start,
2780             max_out_running_time);
2781 
2782         g_queue_pop_head (&splitmux->pending_input_gops);
2783         input_gop_free (gop);
2784 
2785         /* The user has requested a split, we can split now that the previous GOP
2786          * has been collected to the correct location */
2787         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2788                 TRUE, FALSE)) {
2789           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2790         }
2791       }
2792     }
2793 
2794     /* If upstream reached EOS we are not expecting more data, no need to wait
2795      * here. */
2796     if (ctx->in_eos)
2797       return;
2798 
2799     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2800         !ctx->flushing &&
2801         ctx->in_running_time >= next_gop_start &&
2802         next_gop_start != GST_CLOCK_STIME_NONE) {
2803       /* Some pad is not yet ready, or GOP is being pushed
2804        * either way, sleep and wait to get woken */
2805       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2806       GST_SPLITMUX_WAIT_INPUT (splitmux);
2807       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2808     } else {
2809       /* This pad is not ready or the state changed - break out and get another
2810        * buffer / event */
2811       break;
2812     }
2813   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2814 }
2815 
2816 static GstPadProbeReturn
handle_mq_input(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)2817 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2818 {
2819   GstSplitMuxSink *splitmux = ctx->splitmux;
2820   GstFlowReturn ret = GST_FLOW_OK;
2821   GstBuffer *buf;
2822   MqStreamBuf *buf_info = NULL;
2823   GstClockTime ts, pts, dts;
2824   GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2825   gboolean loop_again;
2826   gboolean keyframe = FALSE;
2827 
2828   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2829 
2830   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2831   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2832     g_warning ("Buffer list handling not implemented");
2833     return GST_PAD_PROBE_DROP;
2834   }
2835   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2836       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2837     GstEvent *event = gst_pad_probe_info_get_event (info);
2838 
2839     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2840 
2841     switch (GST_EVENT_TYPE (event)) {
2842       case GST_EVENT_SEGMENT:
2843         gst_event_copy_segment (event, &ctx->in_segment);
2844         break;
2845       case GST_EVENT_FLUSH_STOP:
2846         GST_SPLITMUX_LOCK (splitmux);
2847         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2848         ctx->in_eos = FALSE;
2849         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2850         GST_SPLITMUX_UNLOCK (splitmux);
2851         break;
2852       case GST_EVENT_EOS:
2853         GST_SPLITMUX_LOCK (splitmux);
2854         ctx->in_eos = TRUE;
2855 
2856         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2857           ret = GST_FLOW_FLUSHING;
2858           goto beach;
2859         }
2860 
2861         if (ctx->is_reference) {
2862           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2863           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2864           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2865           /* Wake up other input pads to collect this GOP */
2866           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2867           check_completed_gop (splitmux, ctx);
2868         } else if (splitmux->input_state ==
2869             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2870           /* If we are waiting for a GOP to be completed (ie, for aux
2871            * pads to catch up), then this pad is complete, so check
2872            * if the whole GOP is.
2873            */
2874           check_completed_gop (splitmux, ctx);
2875         }
2876         GST_SPLITMUX_UNLOCK (splitmux);
2877         break;
2878       case GST_EVENT_GAP:{
2879         GstClockTime gap_ts;
2880         GstClockTimeDiff rtime;
2881 
2882         gst_event_parse_gap (event, &gap_ts, NULL);
2883         if (gap_ts == GST_CLOCK_TIME_NONE)
2884           break;
2885 
2886         GST_SPLITMUX_LOCK (splitmux);
2887 
2888         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2889           ret = GST_FLOW_FLUSHING;
2890           goto beach;
2891         }
2892         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2893 
2894         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2895             GST_STIME_ARGS (rtime));
2896 
2897         if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2898           /* If this GAP event happens before the first fragment then
2899            * initialize the fragment start time here. */
2900           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2901             splitmux->fragment_start_time = rtime;
2902             GST_LOG_OBJECT (splitmux,
2903                 "Fragment start time now %" GST_STIME_FORMAT,
2904                 GST_STIME_ARGS (splitmux->fragment_start_time));
2905 
2906             /* Also take this as the first start time when starting up,
2907              * so that we start counting overflow from the first frame */
2908             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2909               splitmux->max_in_running_time = rtime;
2910             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2911               splitmux->max_in_running_time_dts = rtime;
2912           }
2913 
2914           /* Similarly take it as fragment start PTS and GOP start time if
2915            * these are not set */
2916           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2917             splitmux->fragment_start_time_pts = rtime;
2918 
2919           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2920             InputGop *gop = g_slice_new0 (InputGop);
2921 
2922             gop->from_gap = TRUE;
2923             gop->start_time = rtime;
2924             gop->start_time_pts = rtime;
2925 
2926             g_queue_push_tail (&splitmux->pending_input_gops, gop);
2927           }
2928         }
2929 
2930         GST_SPLITMUX_UNLOCK (splitmux);
2931         break;
2932       }
2933       default:
2934         break;
2935     }
2936     return GST_PAD_PROBE_PASS;
2937   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2938     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2939       case GST_QUERY_ALLOCATION:
2940         return GST_PAD_PROBE_DROP;
2941       default:
2942         return GST_PAD_PROBE_PASS;
2943     }
2944   }
2945 
2946   buf = gst_pad_probe_info_get_buffer (info);
2947   buf_info = mq_stream_buf_new ();
2948 
2949   pts = GST_BUFFER_PTS (buf);
2950   dts = GST_BUFFER_DTS (buf);
2951   if (GST_BUFFER_PTS_IS_VALID (buf))
2952     ts = GST_BUFFER_PTS (buf);
2953   else
2954     ts = GST_BUFFER_DTS (buf);
2955 
2956   GST_LOG_OBJECT (pad,
2957       "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2958       GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2959       GST_TIME_ARGS (dts));
2960 
2961 #ifdef OHOS_OPT_COMPAT
2962 // ohos.opt.compat.0018
2963 // to avoid when buffer take too many time in src,
2964 // cause pst comes to clock none.
2965   GST_BUFFER_PTS (buf) = ts;
2966   GST_BUFFER_DTS (buf) = ts;
2967 #endif
2968 
2969   GST_SPLITMUX_LOCK (splitmux);
2970 
2971   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2972     ret = GST_FLOW_FLUSHING;
2973     goto beach;
2974   }
2975 
2976   /* If this buffer has a timestamp, advance the input timestamp of the
2977    * stream */
2978   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2979     running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2980 
2981     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2982         GST_STIME_ARGS (running_time));
2983 
2984     /* in running time is always the maximum PTS (or DTS) that was observed so far */
2985     if (GST_CLOCK_STIME_IS_VALID (running_time)
2986         && running_time > ctx->in_running_time)
2987       ctx->in_running_time = running_time;
2988   } else {
2989     running_time = ctx->in_running_time;
2990   }
2991 
2992   if (GST_CLOCK_TIME_IS_VALID (pts))
2993     running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2994   else
2995     running_time_pts = GST_CLOCK_STIME_NONE;
2996 
2997   if (GST_CLOCK_TIME_IS_VALID (dts))
2998     running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2999   else
3000     running_time_dts = GST_CLOCK_STIME_NONE;
3001 
3002   /* Try to make sure we have a valid running time */
3003   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
3004     ctx->in_running_time =
3005         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
3006   }
3007 
3008   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
3009       GST_STIME_ARGS (ctx->in_running_time));
3010 
3011   buf_info->run_ts = ctx->in_running_time;
3012   buf_info->buf_size = gst_buffer_get_size (buf);
3013   buf_info->duration = GST_BUFFER_DURATION (buf);
3014 
3015   if (ctx->is_reference) {
3016     InputGop *gop = NULL;
3017     GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
3018 
3019     /* initialize fragment_start_time if it was not set yet (i.e. for the
3020      * first fragment), or otherwise set it to the minimum observed time */
3021     if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
3022         || splitmux->fragment_start_time > running_time) {
3023       if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
3024         splitmux->fragment_start_time_pts = running_time_pts;
3025       splitmux->fragment_start_time = running_time;
3026 
3027       GST_LOG_OBJECT (splitmux,
3028           "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
3029           GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
3030           GST_STIME_ARGS (splitmux->fragment_start_time_pts));
3031 
3032       /* Also take this as the first start time when starting up,
3033        * so that we start counting overflow from the first frame */
3034       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
3035           || splitmux->max_in_running_time < splitmux->fragment_start_time)
3036         splitmux->max_in_running_time = splitmux->fragment_start_time;
3037 
3038       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
3039         splitmux->max_in_running_time_dts = running_time_dts;
3040 
3041       if (tc_meta) {
3042         video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
3043 
3044         splitmux->next_fragment_start_tc_time =
3045             calculate_next_max_timecode (splitmux, &tc_meta->tc,
3046             running_time, NULL);
3047         if (splitmux->tc_interval
3048             && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
3049         {
3050           GST_WARNING_OBJECT (splitmux,
3051               "Couldn't calculate next fragment start time for timecode mode");
3052         }
3053 #ifndef GST_DISABLE_GST_DEBUG
3054         {
3055           gchar *tc_str;
3056 
3057           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3058           GST_DEBUG_OBJECT (splitmux,
3059               "Initialize fragment start timecode %s, next fragment start timecode time %"
3060               GST_TIME_FORMAT, tc_str,
3061               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
3062           g_free (tc_str);
3063         }
3064 #endif
3065       }
3066     }
3067 
3068 
3069     /* First check if we're at the very first GOP and the tracking was created
3070      * from a GAP event. In that case don't start a new GOP on keyframes but
3071      * just updated it as needed */
3072     gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3073 
3074     if (!gop || (!gop->from_gap
3075             && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3076       gop = g_slice_new0 (InputGop);
3077 
3078       gop->start_time = running_time;
3079       gop->start_time_pts = running_time_pts;
3080 
3081       GST_LOG_OBJECT (splitmux,
3082           "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3083           GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3084           GST_STIME_ARGS (gop->start_time_pts));
3085 
3086       if (tc_meta) {
3087         video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3088 
3089 #ifndef GST_DISABLE_GST_DEBUG
3090         {
3091           gchar *tc_str;
3092 
3093           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3094           GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3095           g_free (tc_str);
3096         }
3097 #endif
3098       }
3099 
3100       g_queue_push_tail (&splitmux->pending_input_gops, gop);
3101     } else {
3102       gop->from_gap = FALSE;
3103 
3104       if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3105           || gop->start_time > running_time) {
3106         gop->start_time = running_time;
3107 
3108         GST_LOG_OBJECT (splitmux,
3109             "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3110             GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3111             GST_STIME_ARGS (gop->start_time_pts));
3112 
3113         if (tc_meta) {
3114           video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3115 
3116 #ifndef GST_DISABLE_GST_DEBUG
3117           {
3118             gchar *tc_str;
3119 
3120             tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3121             GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3122                 tc_str);
3123             g_free (tc_str);
3124           }
3125 #endif
3126         }
3127       }
3128     }
3129 
3130     /* Check whether we need to request next keyframe depending on
3131      * current running time */
3132     if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3133       GST_WARNING_OBJECT (splitmux,
3134           "Could not request a keyframe. Files may not split at the exact location they should");
3135     }
3136   }
3137 
3138   {
3139     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3140 
3141     if (gop) {
3142       GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3143           " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3144           G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3145           gop->total_bytes, gop->total_bytes);
3146     }
3147   }
3148 
3149   loop_again = TRUE;
3150   do {
3151     if (ctx->flushing)
3152       break;
3153 
3154     switch (splitmux->input_state) {
3155       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3156         if (ctx->is_releasing) {
3157           /* The pad belonging to this context is being released */
3158           GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
3159               "running. Data might not drain correctly");
3160           loop_again = FALSE;
3161         } else if (ctx->is_reference) {
3162           const InputGop *gop, *next_gop;
3163 
3164           /* This is the reference context. If it's a keyframe,
3165            * it marks the start of a new GOP and we should wait in
3166            * check_completed_gop before continuing, but either way
3167            * (keyframe or no, we'll pass this buffer through after
3168            * so set loop_again to FALSE */
3169           loop_again = FALSE;
3170 
3171           gop = g_queue_peek_head (&splitmux->pending_input_gops);
3172           g_assert (gop != NULL);
3173           next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3174 
3175           if (ctx->in_running_time > splitmux->max_in_running_time)
3176             splitmux->max_in_running_time = ctx->in_running_time;
3177           if (running_time_dts > splitmux->max_in_running_time_dts)
3178             splitmux->max_in_running_time_dts = running_time_dts;
3179 
3180           GST_LOG_OBJECT (splitmux,
3181               "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3182               GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3183               GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3184 
3185           if (!next_gop) {
3186             GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3187             /* Allow other input pads to catch up to here too */
3188             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3189             break;
3190           }
3191 
3192           if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3193             GST_INFO_OBJECT (pad,
3194                 "Have keyframe with running time %" GST_STIME_FORMAT,
3195                 GST_STIME_ARGS (ctx->in_running_time));
3196             keyframe = TRUE;
3197           }
3198 
3199           if (running_time_dts != GST_CLOCK_STIME_NONE
3200               && running_time_dts < next_gop->start_time_pts) {
3201             GST_DEBUG_OBJECT (splitmux,
3202                 "Waiting until DTS (%" GST_STIME_FORMAT
3203                 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3204                 GST_STIME_ARGS (running_time_dts),
3205                 GST_STIME_ARGS (next_gop->start_time_pts));
3206             /* Allow other input pads to catch up to here too */
3207             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3208             break;
3209           }
3210 
3211           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3212           /* Wake up other input pads to collect this GOP */
3213           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3214           check_completed_gop (splitmux, ctx);
3215         } else {
3216           /* Pass this buffer if the reference ctx is far enough ahead */
3217           if (ctx->in_running_time < splitmux->max_in_running_time) {
3218             loop_again = FALSE;
3219             break;
3220           }
3221 
3222           /* We're still waiting for a keyframe on the reference pad, sleep */
3223           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3224           GST_SPLITMUX_WAIT_INPUT (splitmux);
3225           GST_LOG_OBJECT (pad,
3226               "Done sleeping for GOP start input state now %d",
3227               splitmux->input_state);
3228         }
3229         break;
3230       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3231         /* We're collecting a GOP, this is only ever called for non-reference
3232          * contexts as the reference context would be waiting inside
3233          * check_completed_gop() */
3234 
3235         g_assert (!ctx->is_reference);
3236 
3237         /* If we overran the target timestamp, it might be time to process
3238          * the GOP, otherwise bail out for more data. */
3239         GST_LOG_OBJECT (pad,
3240             "Checking TS %" GST_STIME_FORMAT " against max %"
3241             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3242             GST_STIME_ARGS (splitmux->max_in_running_time));
3243 
3244         if (ctx->in_running_time < splitmux->max_in_running_time) {
3245           loop_again = FALSE;
3246           break;
3247         }
3248 
3249         GST_LOG_OBJECT (pad,
3250             "Collected last packet of GOP. Checking other pads");
3251         check_completed_gop (splitmux, ctx);
3252         break;
3253       }
3254       case SPLITMUX_INPUT_STATE_FINISHING_UP:
3255         loop_again = FALSE;
3256         break;
3257       default:
3258         loop_again = FALSE;
3259         break;
3260     }
3261   }
3262   while (loop_again);
3263 
3264   if (keyframe && ctx->is_reference)
3265     splitmux->queued_keyframes++;
3266   buf_info->keyframe = keyframe;
3267 
3268   /* Update total input byte counter for overflow detect unless we're after
3269    * EOS now */
3270   if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3271       && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3272     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3273 
3274     /* We must have a GOP at this point */
3275     g_assert (gop != NULL);
3276 
3277     gop->total_bytes += buf_info->buf_size;
3278     if (ctx->is_reference) {
3279       gop->reference_bytes += buf_info->buf_size;
3280     }
3281   }
3282 
3283   /* Now add this buffer to the queue just before returning */
3284   g_queue_push_head (&ctx->queued_bufs, buf_info);
3285 
3286   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3287       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3288 
3289   GST_SPLITMUX_UNLOCK (splitmux);
3290   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3291   return GST_PAD_PROBE_PASS;
3292 
3293 beach:
3294   GST_SPLITMUX_UNLOCK (splitmux);
3295   if (buf_info)
3296     mq_stream_buf_free (buf_info);
3297   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3298   return GST_PAD_PROBE_PASS;
3299 }
3300 
3301 static void
grow_blocked_queues(GstSplitMuxSink * splitmux)3302 grow_blocked_queues (GstSplitMuxSink * splitmux)
3303 {
3304   GList *cur;
3305 
3306   /* Scan other queues for full-ness and grow them */
3307   for (cur = g_list_first (splitmux->contexts);
3308       cur != NULL; cur = g_list_next (cur)) {
3309     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3310     guint cur_limit;
3311     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3312 
3313     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3314     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3315 
3316     if (cur_len >= cur_limit) {
3317       cur_limit = cur_len + 1;
3318       GST_DEBUG_OBJECT (tmpctx->q,
3319           "Queue overflowed and needs enlarging. Growing to %u buffers",
3320           cur_limit);
3321       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3322     }
3323   }
3324 }
3325 
3326 static void
handle_q_underrun(GstElement * q,gpointer user_data)3327 handle_q_underrun (GstElement * q, gpointer user_data)
3328 {
3329   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3330   GstSplitMuxSink *splitmux = ctx->splitmux;
3331 
3332   GST_SPLITMUX_LOCK (splitmux);
3333   GST_DEBUG_OBJECT (q,
3334       "Queue reported underrun with %d keyframes and %d cmds enqueued",
3335       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3336   grow_blocked_queues (splitmux);
3337   GST_SPLITMUX_UNLOCK (splitmux);
3338 }
3339 
3340 static void
handle_q_overrun(GstElement * q,gpointer user_data)3341 handle_q_overrun (GstElement * q, gpointer user_data)
3342 {
3343   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3344   GstSplitMuxSink *splitmux = ctx->splitmux;
3345   gboolean allow_grow = FALSE;
3346 
3347   GST_SPLITMUX_LOCK (splitmux);
3348   GST_DEBUG_OBJECT (q,
3349       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3350       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3351 
3352   if (splitmux->queued_keyframes < 2) {
3353     /* Less than a full GOP queued, grow the queue */
3354     allow_grow = TRUE;
3355   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3356     allow_grow = TRUE;
3357   } else {
3358     /* If another queue is starved, grow */
3359     GList *cur;
3360     for (cur = g_list_first (splitmux->contexts);
3361         cur != NULL; cur = g_list_next (cur)) {
3362       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3363       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3364         allow_grow = TRUE;
3365       }
3366     }
3367   }
3368   GST_SPLITMUX_UNLOCK (splitmux);
3369 
3370   if (allow_grow) {
3371     guint cur_limit;
3372 
3373     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3374     cur_limit++;
3375 
3376     GST_DEBUG_OBJECT (q,
3377         "Queue overflowed and needs enlarging. Growing to %u buffers",
3378         cur_limit);
3379 
3380     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3381   }
3382 }
3383 
3384 /* Called with SPLITMUX lock held */
3385 static const gchar *
lookup_muxer_pad(GstSplitMuxSink * splitmux,const gchar * sinkpad_name)3386 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3387 {
3388   const gchar *ret = NULL;
3389 
3390   if (splitmux->muxerpad_map == NULL)
3391     return NULL;
3392 
3393   if (sinkpad_name == NULL) {
3394     GST_WARNING_OBJECT (splitmux,
3395         "Can't look up request pad in pad map without providing a pad name");
3396     return NULL;
3397   }
3398 
3399   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3400   if (ret) {
3401     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3402         ret);
3403     return g_strdup (ret);
3404   }
3405 
3406   return NULL;
3407 }
3408 
3409 static GstPad *
gst_splitmux_sink_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)3410 gst_splitmux_sink_request_new_pad (GstElement * element,
3411     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3412 {
3413   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3414   GstPadTemplate *mux_template = NULL;
3415   GstPad *ret = NULL, *muxpad = NULL;
3416   GstElement *q;
3417   GstPad *q_sink = NULL, *q_src = NULL;
3418   gchar *gname, *qname;
3419   gboolean is_primary_video = FALSE, is_video = FALSE,
3420       muxer_is_requestpad = FALSE;
3421   MqStreamCtx *ctx;
3422   const gchar *muxer_padname = NULL;
3423 
3424   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3425 
3426   GST_SPLITMUX_LOCK (splitmux);
3427   if (!create_muxer (splitmux))
3428     goto fail;
3429   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3430 
3431   if (g_str_equal (templ->name_template, "video") ||
3432       g_str_has_prefix (templ->name_template, "video_aux_")) {
3433     is_primary_video = g_str_equal (templ->name_template, "video");
3434     if (is_primary_video && splitmux->have_video)
3435       goto already_have_video;
3436     is_video = TRUE;
3437   }
3438 
3439   /* See if there's a pad map and it lists this pad */
3440   muxer_padname = lookup_muxer_pad (splitmux, name);
3441 
3442   if (muxer_padname == NULL) {
3443     if (is_video) {
3444       /* FIXME: Look for a pad template with matching caps, rather than by name */
3445       GST_DEBUG_OBJECT (element,
3446           "searching for pad-template with name 'video_%%u'");
3447       mux_template =
3448           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3449           (splitmux->muxer), "video_%u");
3450 
3451       /* Fallback to find sink pad templates named 'video' (flvmux) */
3452       if (!mux_template) {
3453         GST_DEBUG_OBJECT (element,
3454             "searching for pad-template with name 'video'");
3455         mux_template =
3456             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3457             (splitmux->muxer), "video");
3458       }
3459       name = NULL;
3460     } else {
3461       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3462           templ->name_template);
3463       mux_template =
3464           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3465           (splitmux->muxer), templ->name_template);
3466 
3467       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3468       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3469         GST_DEBUG_OBJECT (element,
3470             "searching for pad-template with name 'audio'");
3471         mux_template =
3472             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3473             (splitmux->muxer), "audio");
3474         name = NULL;
3475       }
3476     }
3477 
3478     if (mux_template == NULL) {
3479       GST_DEBUG_OBJECT (element,
3480           "searching for pad-template with name 'sink_%%d'");
3481       mux_template =
3482           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3483           (splitmux->muxer), "sink_%d");
3484       name = NULL;
3485     }
3486     if (mux_template == NULL) {
3487       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3488       mux_template =
3489           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3490           (splitmux->muxer), "sink");
3491       name = NULL;
3492     }
3493 
3494     if (mux_template == NULL) {
3495       GST_ERROR_OBJECT (element,
3496           "unable to find a suitable sink pad-template on the muxer");
3497       goto fail;
3498     }
3499     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3500         mux_template->name_template);
3501 
3502     if (mux_template->presence == GST_PAD_REQUEST) {
3503       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3504 
3505       muxpad =
3506           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3507       muxer_is_requestpad = TRUE;
3508     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3509       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3510 
3511       muxpad =
3512           gst_element_get_static_pad (splitmux->muxer,
3513           mux_template->name_template);
3514     } else {
3515       GST_ERROR_OBJECT (element,
3516           "unexpected pad presence %d", mux_template->presence);
3517       goto fail;
3518     }
3519   } else {
3520     /* Have a muxer pad name */
3521     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3522       if ((muxpad =
3523               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3524         muxer_is_requestpad = TRUE;
3525     }
3526     g_free ((gchar *) muxer_padname);
3527     muxer_padname = NULL;
3528   }
3529 
3530   /* One way or another, we must have a muxer pad by now */
3531   if (muxpad == NULL)
3532     goto fail;
3533 
3534   if (is_primary_video)
3535     gname = g_strdup ("video");
3536   else if (name == NULL)
3537     gname = gst_pad_get_name (muxpad);
3538   else
3539     gname = g_strdup (name);
3540 
3541   qname = g_strdup_printf ("queue_%s", gname);
3542   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3543     g_free (qname);
3544     goto fail;
3545   }
3546   g_free (qname);
3547 
3548   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3549 
3550   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3551       "max-size-buffers", 5, NULL);
3552 
3553   q_sink = gst_element_get_static_pad (q, "sink");
3554   q_src = gst_element_get_static_pad (q, "src");
3555 
3556   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3557     if (muxer_is_requestpad)
3558       gst_element_release_request_pad (splitmux->muxer, muxpad);
3559     gst_object_unref (GST_OBJECT (muxpad));
3560     goto fail;
3561   }
3562 
3563   gst_object_unref (GST_OBJECT (muxpad));
3564 
3565   ctx = mq_stream_ctx_new (splitmux);
3566   /* Context holds a ref: */
3567   ctx->q = gst_object_ref (q);
3568   ctx->srcpad = q_src;
3569   ctx->sinkpad = q_sink;
3570   ctx->q_overrun_id =
3571       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3572   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3573 
3574   ctx->src_pad_block_id =
3575       gst_pad_add_probe (q_src,
3576       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3577       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3578   if (is_primary_video && splitmux->reference_ctx != NULL) {
3579     splitmux->reference_ctx->is_reference = FALSE;
3580     splitmux->reference_ctx = NULL;
3581   }
3582   if (splitmux->reference_ctx == NULL) {
3583     splitmux->reference_ctx = ctx;
3584     ctx->is_reference = TRUE;
3585   }
3586 
3587   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3588   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3589 
3590   ctx->sink_pad_block_id =
3591       gst_pad_add_probe (q_sink,
3592       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3593       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3594       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3595 
3596   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3597       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3598 
3599   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3600 
3601   g_free (gname);
3602 
3603   if (is_primary_video)
3604     splitmux->have_video = TRUE;
3605 
3606   gst_pad_set_active (ret, TRUE);
3607   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3608 
3609   GST_SPLITMUX_UNLOCK (splitmux);
3610 
3611   return ret;
3612 fail:
3613   GST_SPLITMUX_UNLOCK (splitmux);
3614 
3615   if (q_sink)
3616     gst_object_unref (q_sink);
3617   if (q_src)
3618     gst_object_unref (q_src);
3619   return NULL;
3620 already_have_video:
3621   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3622   GST_SPLITMUX_UNLOCK (splitmux);
3623   return NULL;
3624 }
3625 
3626 static void
gst_splitmux_sink_release_pad(GstElement * element,GstPad * pad)3627 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3628 {
3629   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3630   GstPad *muxpad = NULL;
3631   MqStreamCtx *ctx =
3632       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3633 
3634   GST_SPLITMUX_LOCK (splitmux);
3635 
3636   if (splitmux->muxer == NULL)
3637     goto fail;                  /* Elements don't exist yet - nothing to release */
3638 
3639   GST_INFO_OBJECT (pad, "releasing request pad");
3640 
3641   muxpad = gst_pad_get_peer (ctx->srcpad);
3642 
3643   /* Remove the context from our consideration */
3644   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3645 
3646   GST_SPLITMUX_UNLOCK (splitmux);
3647 
3648   if (ctx->sink_pad_block_id) {
3649     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3650     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3651   }
3652 
3653   if (ctx->src_pad_block_id)
3654     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3655 
3656   GST_SPLITMUX_LOCK (splitmux);
3657 
3658   ctx->is_releasing = TRUE;
3659   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3660 
3661   /* Can release the context now */
3662   mq_stream_ctx_free (ctx);
3663   if (ctx == splitmux->reference_ctx)
3664     splitmux->reference_ctx = NULL;
3665 
3666   /* Release and free the muxer input */
3667   if (muxpad) {
3668     gst_element_release_request_pad (splitmux->muxer, muxpad);
3669     gst_object_unref (muxpad);
3670   }
3671 
3672   if (GST_PAD_PAD_TEMPLATE (pad) &&
3673       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3674               (pad)), "video"))
3675     splitmux->have_video = FALSE;
3676 
3677   gst_element_remove_pad (element, pad);
3678 
3679   /* Reset the internal elements only after all request pads are released */
3680   if (splitmux->contexts == NULL)
3681     gst_splitmux_reset_elements (splitmux);
3682 
3683   /* Wake up other input streams to check if the completion conditions have
3684    * changed */
3685   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3686 
3687 fail:
3688   GST_SPLITMUX_UNLOCK (splitmux);
3689 }
3690 
3691 static GstElement *
create_element(GstSplitMuxSink * splitmux,const gchar * factory,const gchar * name,gboolean locked)3692 create_element (GstSplitMuxSink * splitmux,
3693     const gchar * factory, const gchar * name, gboolean locked)
3694 {
3695   GstElement *ret = gst_element_factory_make (factory, name);
3696   if (ret == NULL) {
3697     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3698     return NULL;
3699   }
3700 
3701   if (locked) {
3702     /* Ensure the sink starts in locked state and NULL - it will be changed
3703      * by the filename setting code */
3704     gst_element_set_locked_state (ret, TRUE);
3705     gst_element_set_state (ret, GST_STATE_NULL);
3706   }
3707 
3708   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3709     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3710     gst_object_unref (ret);
3711     return NULL;
3712   }
3713 
3714   return ret;
3715 }
3716 
3717 static gboolean
create_muxer(GstSplitMuxSink * splitmux)3718 create_muxer (GstSplitMuxSink * splitmux)
3719 {
3720   /* Create internal elements */
3721   if (splitmux->muxer == NULL) {
3722     GstElement *provided_muxer = NULL;
3723 
3724     GST_OBJECT_LOCK (splitmux);
3725     if (splitmux->provided_muxer != NULL)
3726       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3727     GST_OBJECT_UNLOCK (splitmux);
3728 
3729     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3730         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3731       if ((splitmux->muxer =
3732               create_element (splitmux,
3733                   splitmux->muxer_factory ? splitmux->
3734                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3735         goto fail;
3736     } else if (splitmux->async_finalize) {
3737       if ((splitmux->muxer =
3738               create_element (splitmux, splitmux->muxer_factory, "muxer",
3739                   FALSE)) == NULL)
3740         goto fail;
3741       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3742         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3743             splitmux->muxer_preset);
3744       if (splitmux->muxer_properties)
3745         gst_structure_foreach (splitmux->muxer_properties,
3746             _set_property_from_structure, splitmux->muxer);
3747     } else {
3748       /* Ensure it's not in locked state (we might be reusing an old element) */
3749       gst_element_set_locked_state (provided_muxer, FALSE);
3750       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3751         g_warning ("Could not add muxer element - splitmuxsink will not work");
3752         gst_object_unref (provided_muxer);
3753         goto fail;
3754       }
3755 
3756       splitmux->muxer = provided_muxer;
3757       gst_object_unref (provided_muxer);
3758     }
3759 
3760     if (splitmux->use_robust_muxing) {
3761       update_muxer_properties (splitmux);
3762     }
3763   }
3764 
3765   return TRUE;
3766 fail:
3767   return FALSE;
3768 }
3769 
3770 static GstElement *
find_sink(GstElement * e)3771 find_sink (GstElement * e)
3772 {
3773   GstElement *res = NULL;
3774   GstIterator *iter;
3775   gboolean done = FALSE;
3776   GValue data = { 0, };
3777 
3778   if (!GST_IS_BIN (e))
3779     return e;
3780 
3781   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3782     return e;
3783 
3784   iter = gst_bin_iterate_sinks (GST_BIN (e));
3785   while (!done) {
3786     switch (gst_iterator_next (iter, &data)) {
3787       case GST_ITERATOR_OK:
3788       {
3789         GstElement *child = g_value_get_object (&data);
3790         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3791                 "location") != NULL) {
3792           res = child;
3793           done = TRUE;
3794         }
3795         g_value_reset (&data);
3796         break;
3797       }
3798       case GST_ITERATOR_RESYNC:
3799         gst_iterator_resync (iter);
3800         break;
3801       case GST_ITERATOR_DONE:
3802         done = TRUE;
3803         break;
3804       case GST_ITERATOR_ERROR:
3805         g_assert_not_reached ();
3806         break;
3807     }
3808   }
3809   g_value_unset (&data);
3810   gst_iterator_free (iter);
3811 
3812   return res;
3813 }
3814 
3815 static gboolean
create_sink(GstSplitMuxSink * splitmux)3816 create_sink (GstSplitMuxSink * splitmux)
3817 {
3818   GstElement *provided_sink = NULL;
3819 
3820   if (splitmux->active_sink == NULL) {
3821 
3822     GST_OBJECT_LOCK (splitmux);
3823     if (splitmux->provided_sink != NULL)
3824       provided_sink = gst_object_ref (splitmux->provided_sink);
3825     GST_OBJECT_UNLOCK (splitmux);
3826 
3827     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3828         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3829       if ((splitmux->sink =
3830               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3831         goto fail;
3832       splitmux->active_sink = splitmux->sink;
3833     } else if (splitmux->async_finalize) {
3834       if ((splitmux->sink =
3835               create_element (splitmux, splitmux->sink_factory, "sink",
3836                   TRUE)) == NULL)
3837         goto fail;
3838       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3839         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3840             splitmux->sink_preset);
3841       if (splitmux->sink_properties)
3842         gst_structure_foreach (splitmux->sink_properties,
3843             _set_property_from_structure, splitmux->sink);
3844       splitmux->active_sink = splitmux->sink;
3845     } else {
3846       /* Ensure the sink starts in locked state and NULL - it will be changed
3847        * by the filename setting code */
3848       gst_element_set_locked_state (provided_sink, TRUE);
3849       gst_element_set_state (provided_sink, GST_STATE_NULL);
3850       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3851         g_warning ("Could not add sink elements - splitmuxsink will not work");
3852         gst_object_unref (provided_sink);
3853         goto fail;
3854       }
3855 
3856       splitmux->active_sink = provided_sink;
3857 
3858       /* The bin holds a ref now, we can drop our tmp ref */
3859       gst_object_unref (provided_sink);
3860 
3861       /* Find the sink element */
3862       splitmux->sink = find_sink (splitmux->active_sink);
3863       if (splitmux->sink == NULL) {
3864         g_warning
3865             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3866         goto fail;
3867       }
3868     }
3869 
3870 #if 1
3871     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3872             "async") != NULL) {
3873       /* async child elements are causing state change races and weird
3874        * failures, so let's try and turn that off */
3875       g_object_set (splitmux->sink, "async", FALSE, NULL);
3876     }
3877 #endif
3878 
3879     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3880       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3881       goto fail;
3882     }
3883   }
3884 
3885   return TRUE;
3886 fail:
3887   return FALSE;
3888 }
3889 
3890 #ifdef __GNUC__
3891 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3892 #endif
3893 static void
set_next_filename(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)3894 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3895 {
3896   gchar *fname = NULL;
3897   GstSample *sample;
3898   GstCaps *caps;
3899 
3900   gst_splitmux_sink_ensure_max_files (splitmux);
3901 
3902   if (ctx->cur_out_buffer == NULL) {
3903     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3904   }
3905 
3906   caps = gst_pad_get_current_caps (ctx->srcpad);
3907   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3908   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3909       splitmux->fragment_id, sample, &fname);
3910   gst_sample_unref (sample);
3911   if (caps)
3912     gst_caps_unref (caps);
3913 
3914   if (fname == NULL) {
3915     /* Fallback to the old signal if the new one returned nothing */
3916     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3917         splitmux->fragment_id, &fname);
3918   }
3919 
3920   if (!fname)
3921     fname = splitmux->location ?
3922         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3923 
3924   if (fname) {
3925     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3926     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3927             "location") != NULL)
3928       g_object_set (splitmux->sink, "location", fname, NULL);
3929     g_free (fname);
3930   }
3931 
3932   splitmux->fragment_id++;
3933 }
3934 
3935 /* called with GST_SPLITMUX_LOCK */
3936 static void
do_async_start(GstSplitMuxSink * splitmux)3937 do_async_start (GstSplitMuxSink * splitmux)
3938 {
3939   GstMessage *message;
3940 
3941   if (!splitmux->need_async_start) {
3942     GST_INFO_OBJECT (splitmux, "no async_start needed");
3943     return;
3944   }
3945 
3946   splitmux->async_pending = TRUE;
3947 
3948   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3949   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3950 
3951   GST_SPLITMUX_UNLOCK (splitmux);
3952   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3953       (splitmux), message);
3954   GST_SPLITMUX_LOCK (splitmux);
3955 }
3956 
3957 /* called with GST_SPLITMUX_LOCK */
3958 static void
do_async_done(GstSplitMuxSink * splitmux)3959 do_async_done (GstSplitMuxSink * splitmux)
3960 {
3961   GstMessage *message;
3962 
3963   if (splitmux->async_pending) {
3964     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3965     splitmux->async_pending = FALSE;
3966     GST_SPLITMUX_UNLOCK (splitmux);
3967 
3968     message =
3969         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3970         GST_CLOCK_TIME_NONE);
3971     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3972         (splitmux), message);
3973     GST_SPLITMUX_LOCK (splitmux);
3974   }
3975 
3976   splitmux->need_async_start = FALSE;
3977 }
3978 
3979 static void
gst_splitmux_sink_reset(GstSplitMuxSink * splitmux)3980 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3981 {
3982   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3983   splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3984 
3985   splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3986   splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3987   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3988 
3989   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3990   g_queue_clear (&splitmux->pending_input_gops);
3991 
3992   splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3993   splitmux->fragment_total_bytes = 0;
3994   splitmux->fragment_reference_bytes = 0;
3995   splitmux->muxed_out_bytes = 0;
3996   splitmux->ready_for_output = FALSE;
3997 
3998   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3999   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
4000 
4001   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
4002   gst_queue_array_clear (splitmux->times_to_split);
4003 
4004   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
4005   splitmux->queued_keyframes = 0;
4006 
4007   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
4008   g_queue_clear (&splitmux->out_cmd_q);
4009 }
4010 
4011 static GstStateChangeReturn
gst_splitmux_sink_change_state(GstElement * element,GstStateChange transition)4012 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
4013 {
4014   GstStateChangeReturn ret;
4015   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
4016 
4017   switch (transition) {
4018     case GST_STATE_CHANGE_NULL_TO_READY:{
4019       GST_SPLITMUX_LOCK (splitmux);
4020       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
4021         ret = GST_STATE_CHANGE_FAILURE;
4022         GST_SPLITMUX_UNLOCK (splitmux);
4023         goto beach;
4024       }
4025       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
4026       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
4027       GST_SPLITMUX_UNLOCK (splitmux);
4028       splitmux->fragment_id = splitmux->start_index;
4029       break;
4030     }
4031     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4032       GST_SPLITMUX_LOCK (splitmux);
4033       /* Make sure contexts and tracking times are cleared, in case we're being reused */
4034       gst_splitmux_sink_reset (splitmux);
4035       /* Start by collecting one input on each pad */
4036       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
4037       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
4038 
4039       GST_SPLITMUX_UNLOCK (splitmux);
4040 
4041       GST_SPLITMUX_STATE_LOCK (splitmux);
4042       splitmux->shutdown = FALSE;
4043       GST_SPLITMUX_STATE_UNLOCK (splitmux);
4044       break;
4045     }
4046     case GST_STATE_CHANGE_PAUSED_TO_READY:
4047     case GST_STATE_CHANGE_READY_TO_READY:
4048       g_atomic_int_set (&(splitmux->split_requested), FALSE);
4049       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
4050       /* Fall through */
4051     case GST_STATE_CHANGE_READY_TO_NULL:
4052       GST_SPLITMUX_STATE_LOCK (splitmux);
4053       splitmux->shutdown = TRUE;
4054       GST_SPLITMUX_STATE_UNLOCK (splitmux);
4055 
4056       GST_SPLITMUX_LOCK (splitmux);
4057       gst_splitmux_sink_reset (splitmux);
4058       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
4059       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
4060       /* Wake up any blocked threads */
4061       GST_LOG_OBJECT (splitmux,
4062           "State change -> NULL or READY. Waking threads");
4063       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
4064       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
4065       GST_SPLITMUX_UNLOCK (splitmux);
4066       break;
4067     default:
4068       break;
4069   }
4070 
4071   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4072   if (ret == GST_STATE_CHANGE_FAILURE)
4073     goto beach;
4074 
4075   switch (transition) {
4076     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4077       splitmux->need_async_start = TRUE;
4078       break;
4079     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4080       /* Change state async, because our child sink might not
4081        * be ready to do that for us yet if it's state is still locked */
4082 
4083       splitmux->need_async_start = TRUE;
4084       /* we want to go async to PAUSED until we managed to configure and add the
4085        * sink */
4086       GST_SPLITMUX_LOCK (splitmux);
4087       do_async_start (splitmux);
4088       GST_SPLITMUX_UNLOCK (splitmux);
4089       ret = GST_STATE_CHANGE_ASYNC;
4090       break;
4091     }
4092     case GST_STATE_CHANGE_READY_TO_NULL:
4093       GST_SPLITMUX_LOCK (splitmux);
4094       splitmux->fragment_id = 0;
4095       /* Reset internal elements only if no pad contexts are using them */
4096       if (splitmux->contexts == NULL)
4097         gst_splitmux_reset_elements (splitmux);
4098       do_async_done (splitmux);
4099       GST_SPLITMUX_UNLOCK (splitmux);
4100       break;
4101     default:
4102       break;
4103   }
4104 
4105   return ret;
4106 
4107 beach:
4108   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4109     /* Cleanup elements on failed transition out of NULL */
4110     gst_splitmux_reset_elements (splitmux);
4111     GST_SPLITMUX_LOCK (splitmux);
4112     do_async_done (splitmux);
4113     GST_SPLITMUX_UNLOCK (splitmux);
4114   }
4115   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4116     /* READY to READY transition only happens when we're already
4117      * in READY state, but a child element is in NULL, which
4118      * happens when there's an error changing the state of the sink.
4119      * We need to make sure not to fail the state transition, or
4120      * the core won't transition us back to NULL successfully */
4121     ret = GST_STATE_CHANGE_SUCCESS;
4122   }
4123   return ret;
4124 }
4125 
4126 static void
gst_splitmux_sink_ensure_max_files(GstSplitMuxSink * splitmux)4127 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4128 {
4129   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4130     splitmux->fragment_id = 0;
4131   }
4132 }
4133 
4134 static void
split_now(GstSplitMuxSink * splitmux)4135 split_now (GstSplitMuxSink * splitmux)
4136 {
4137   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4138 }
4139 
4140 static void
split_after(GstSplitMuxSink * splitmux)4141 split_after (GstSplitMuxSink * splitmux)
4142 {
4143   g_atomic_int_set (&(splitmux->split_requested), TRUE);
4144 }
4145 
4146 static void
split_at_running_time(GstSplitMuxSink * splitmux,GstClockTime split_time)4147 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4148 {
4149   gboolean send_keyframe_requests;
4150 
4151   GST_SPLITMUX_LOCK (splitmux);
4152   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4153   send_keyframe_requests = splitmux->send_keyframe_requests;
4154   GST_SPLITMUX_UNLOCK (splitmux);
4155 
4156   if (send_keyframe_requests) {
4157     GstEvent *ev =
4158         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4159     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4160         GST_TIME_ARGS (split_time));
4161     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4162       GST_WARNING_OBJECT (splitmux,
4163           "Could not request keyframe at %" GST_TIME_FORMAT,
4164           GST_TIME_ARGS (split_time));
4165     }
4166   }
4167 }
4168