• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:element-splitmuxsink
22  * @short_description: Muxer wrapper for splitting output stream by size or time
23  *
24  * This element wraps a muxer and a sink, and starts a new file when the mux
25  * contents are about to cross a threshold of maximum size of maximum time,
26  * splitting at video keyframe boundaries. Exactly one input video stream
27  * can be muxed, with as many accompanying audio and subtitle streams as
28  * desired.
29  *
30  * By default, it uses mp4mux and filesink, but they can be changed via
31  * the 'muxer' and 'sink' properties.
32  *
33  * The minimum file size is 1 GOP, however - so limits may be overrun if the
34  * distance between any 2 keyframes is larger than the limits.
35  *
36  * If a video stream is available, the splitting process is driven by the video
37  * stream contents, and the video stream must contain closed GOPs for the output
38  * file parts to be played individually correctly. In the absence of a video
39  * stream, the first available stream is used as reference for synchronization.
40  *
41  * In the async-finalize mode, when the threshold is crossed, the old muxer
42  * and sink is disconnected from the pipeline and left to finish the file
43  * asynchronously, and a new muxer and sink is created to continue with the
44  * next fragment. For that reason, instead of muxer and sink objects, the
45  * muxer-factory and sink-factory properties are used to construct the new
46  * objects, together with muxer-properties and sink-properties.
47  *
48  * <refsect2>
49  * <title>Example pipelines</title>
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  * </refsect2>
64  */
65 
66 #ifdef HAVE_CONFIG_H
67 #include "config.h"
68 #endif
69 
70 #include <string.h>
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
74 
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
77 
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
82 
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
85 
86 static void split_now (GstSplitMuxSink * splitmux);
87 static void split_after (GstSplitMuxSink * splitmux);
88 static void split_at_running_time (GstSplitMuxSink * splitmux,
89     GstClockTime split_time);
90 
91 enum
92 {
93   PROP_0,
94   PROP_LOCATION,
95   PROP_MAX_SIZE_TIME,
96   PROP_MAX_SIZE_BYTES,
97   PROP_MAX_SIZE_TIMECODE,
98   PROP_SEND_KEYFRAME_REQUESTS,
99   PROP_MAX_FILES,
100   PROP_MUXER_OVERHEAD,
101   PROP_USE_ROBUST_MUXING,
102   PROP_ALIGNMENT_THRESHOLD,
103   PROP_MUXER,
104   PROP_SINK,
105   PROP_RESET_MUXER,
106   PROP_ASYNC_FINALIZE,
107   PROP_MUXER_FACTORY,
108   PROP_MUXER_PROPERTIES,
109   PROP_SINK_FACTORY,
110   PROP_SINK_PROPERTIES
111 };
112 
113 #define DEFAULT_MAX_SIZE_TIME       0
114 #define DEFAULT_MAX_SIZE_BYTES      0
115 #define DEFAULT_MAX_FILES           0
116 #define DEFAULT_MUXER_OVERHEAD      0.02
117 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
118 #define DEFAULT_ALIGNMENT_THRESHOLD 0
119 #define DEFAULT_MUXER "mp4mux"
120 #define DEFAULT_SINK "filesink"
121 #define DEFAULT_USE_ROBUST_MUXING FALSE
122 #define DEFAULT_RESET_MUXER TRUE
123 #define DEFAULT_ASYNC_FINALIZE FALSE
124 
125 typedef struct _AsyncEosHelper
126 {
127   MqStreamCtx *ctx;
128   GstPad *pad;
129 } AsyncEosHelper;
130 
131 enum
132 {
133   SIGNAL_FORMAT_LOCATION,
134   SIGNAL_FORMAT_LOCATION_FULL,
135   SIGNAL_SPLIT_NOW,
136   SIGNAL_SPLIT_AFTER,
137   SIGNAL_SPLIT_AT_RUNNING_TIME,
138   SIGNAL_MUXER_ADDED,
139   SIGNAL_SINK_ADDED,
140   SIGNAL_LAST
141 };
142 
143 static guint signals[SIGNAL_LAST];
144 
145 static GstStaticPadTemplate video_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("video",
147     GST_PAD_SINK,
148     GST_PAD_REQUEST,
149     GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate audio_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("audio_%u",
152     GST_PAD_SINK,
153     GST_PAD_REQUEST,
154     GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate subtitle_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
157     GST_PAD_SINK,
158     GST_PAD_REQUEST,
159     GST_STATIC_CAPS_ANY);
160 static GstStaticPadTemplate caption_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("caption_%u",
162     GST_PAD_SINK,
163     GST_PAD_REQUEST,
164     GST_STATIC_CAPS_ANY);
165 
166 static GQuark PAD_CONTEXT;
167 static GQuark EOS_FROM_US;
168 static GQuark RUNNING_TIME;
169 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
170  * to forward an incoming EOS message, but we cannot rely on the state of the
171  * splitmux anymore, so we set this qdata on the sink instead.
172  * The muxer and sink must be destroyed after both of these things have
173  * finished:
174  * 1) The EOS message has been sent when the fragment is ending
175  * 2) The muxer has been unlinked and relinked
176  * Therefore, EOS_FROM_US can have these two values:
177  * 0: EOS was not requested from us. Forward the message. The muxer and the
178  * sink will be destroyed together with the rest of the bin.
179  * 1: EOS was requested from us, but the other of the two tasks hasn't
180  * finished. Set EOS_FROM_US to 2 and do your stuff.
181  * 2: EOS was requested from us and the other of the two tasks has finished.
182  * Now we can destroy the muxer and the sink.
183  */
184 
185 static void
_do_init(void)186 _do_init (void)
187 {
188   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
189   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
190   RUNNING_TIME = g_quark_from_static_string ("running-time");
191 }
192 
193 #define gst_splitmux_sink_parent_class parent_class
194 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
195     _do_init ());
196 
197 static gboolean create_muxer (GstSplitMuxSink * splitmux);
198 static gboolean create_sink (GstSplitMuxSink * splitmux);
199 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
200     const GValue * value, GParamSpec * pspec);
201 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
202     GValue * value, GParamSpec * pspec);
203 static void gst_splitmux_sink_dispose (GObject * object);
204 static void gst_splitmux_sink_finalize (GObject * object);
205 
206 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
207     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
208 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
209 
210 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
211     element, GstStateChange transition);
212 
213 static void bus_handler (GstBin * bin, GstMessage * msg);
214 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
215 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
216 static void mq_stream_ctx_free (MqStreamCtx * ctx);
217 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
218 
219 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
220 static GstElement *create_element (GstSplitMuxSink * splitmux,
221     const gchar * factory, const gchar * name, gboolean locked);
222 
223 static void do_async_done (GstSplitMuxSink * splitmux);
224 
225 static MqStreamBuf *
mq_stream_buf_new(void)226 mq_stream_buf_new (void)
227 {
228   return g_slice_new0 (MqStreamBuf);
229 }
230 
231 static void
mq_stream_buf_free(MqStreamBuf * data)232 mq_stream_buf_free (MqStreamBuf * data)
233 {
234   g_slice_free (MqStreamBuf, data);
235 }
236 
237 static SplitMuxOutputCommand *
out_cmd_buf_new(void)238 out_cmd_buf_new (void)
239 {
240   return g_slice_new0 (SplitMuxOutputCommand);
241 }
242 
243 static void
out_cmd_buf_free(SplitMuxOutputCommand * data)244 out_cmd_buf_free (SplitMuxOutputCommand * data)
245 {
246   g_slice_free (SplitMuxOutputCommand, data);
247 }
248 
249 static void
gst_splitmux_sink_class_init(GstSplitMuxSinkClass * klass)250 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
251 {
252   GObjectClass *gobject_class = (GObjectClass *) klass;
253   GstElementClass *gstelement_class = (GstElementClass *) klass;
254   GstBinClass *gstbin_class = (GstBinClass *) klass;
255 
256   gobject_class->set_property = gst_splitmux_sink_set_property;
257   gobject_class->get_property = gst_splitmux_sink_get_property;
258   gobject_class->dispose = gst_splitmux_sink_dispose;
259   gobject_class->finalize = gst_splitmux_sink_finalize;
260 
261   gst_element_class_set_static_metadata (gstelement_class,
262       "Split Muxing Bin", "Generic/Bin/Muxer",
263       "Convenience bin that muxes incoming streams into multiple time/size limited files",
264       "Jan Schmidt <jan@centricular.com>");
265 
266   gst_element_class_add_static_pad_template (gstelement_class,
267       &video_sink_template);
268   gst_element_class_add_static_pad_template (gstelement_class,
269       &audio_sink_template);
270   gst_element_class_add_static_pad_template (gstelement_class,
271       &subtitle_sink_template);
272   gst_element_class_add_static_pad_template (gstelement_class,
273       &caption_sink_template);
274 
275   gstelement_class->change_state =
276       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
277   gstelement_class->request_new_pad =
278       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
279   gstelement_class->release_pad =
280       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
281 
282   gstbin_class->handle_message = bus_handler;
283 
284   g_object_class_install_property (gobject_class, PROP_LOCATION,
285       g_param_spec_string ("location", "File Output Pattern",
286           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
287           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
288   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
289       g_param_spec_double ("mux-overhead", "Muxing Overhead",
290           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
291           DEFAULT_MUXER_OVERHEAD,
292           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
293 
294   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
295       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
296           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
297           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
299       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
300           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
301           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
303       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
304           "Maximum difference in timecode between first and last frame. "
305           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
306           "Will only be effective if a timecode track is present.",
307           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
309       g_param_spec_boolean ("send-keyframe-requests",
310           "Request keyframes at max-size-time",
311           "Request a keyframe every max-size-time ns to try splitting at that point. "
312           "Needs max-size-bytes to be 0 in order to be effective.",
313           DEFAULT_SEND_KEYFRAME_REQUESTS,
314           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
316       g_param_spec_uint ("max-files", "Max files",
317           "Maximum number of files to keep on disk. Once the maximum is reached,"
318           "old files start to be deleted to make room for new ones.", 0,
319           G_MAXUINT, DEFAULT_MAX_FILES,
320           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
322       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
323           "Allow non-reference streams to be that many ns before the reference"
324           " stream",
325           0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
326           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327 
328   g_object_class_install_property (gobject_class, PROP_MUXER,
329       g_param_spec_object ("muxer", "Muxer",
330           "The muxer element to use (NULL = default mp4mux). "
331           "Valid only for async-finalize = FALSE",
332           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333   g_object_class_install_property (gobject_class, PROP_SINK,
334       g_param_spec_object ("sink", "Sink",
335           "The sink element (or element chain) to use (NULL = default filesink). "
336           "Valid only for async-finalize = FALSE",
337           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338 
339   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
340       g_param_spec_boolean ("use-robust-muxing",
341           "Support robust-muxing mode of some muxers",
342           "Check if muxers support robust muxing via the reserved-max-duration and "
343           "reserved-duration-remaining properties and use them if so. "
344           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
345           " create new fragments if the reserved header space is about to overflow. "
346           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
347           "manually by the app to a non-zero value for robust muxing to have an effect.",
348           DEFAULT_USE_ROBUST_MUXING,
349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350 
351   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
352       g_param_spec_boolean ("reset-muxer",
353           "Reset Muxer",
354           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
355           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356 
357   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
358       g_param_spec_boolean ("async-finalize",
359           "Finalize fragments asynchronously",
360           "Finalize each fragment asynchronously and start a new one",
361           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
363       g_param_spec_string ("muxer-factory", "Muxer factory",
364           "The muxer element factory to use (default = mp4mux). "
365           "Valid only for async-finalize = TRUE",
366           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
368       g_param_spec_boxed ("muxer-properties", "Muxer properties",
369           "The muxer element properties to use. "
370           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
371           "Valid only for async-finalize = TRUE",
372           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
374       g_param_spec_string ("sink-factory", "Sink factory",
375           "The sink element factory to use (default = filesink). "
376           "Valid only for async-finalize = TRUE",
377           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
379       g_param_spec_boxed ("sink-properties", "Sink properties",
380           "The sink element properties to use. "
381           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
382           "Valid only for async-finalize = TRUE",
383           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
384 
385   /**
386    * GstSplitMuxSink::format-location:
387    * @splitmux: the #GstSplitMuxSink
388    * @fragment_id: the sequence number of the file to be created
389    *
390    * Returns: the location to be used for the next output file
391    */
392   signals[SIGNAL_FORMAT_LOCATION] =
393       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
394       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
395 
396   /**
397    * GstSplitMuxSink::format-location-full:
398    * @splitmux: the #GstSplitMuxSink
399    * @fragment_id: the sequence number of the file to be created
400    * @first_sample: A #GstSample containing the first buffer
401    *   from the reference stream in the new file
402    *
403    * Returns: the location to be used for the next output file
404    */
405   signals[SIGNAL_FORMAT_LOCATION_FULL] =
406       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
407       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
408       GST_TYPE_SAMPLE);
409 
410   /**
411    * GstSplitMuxSink::split-now:
412    * @splitmux: the #GstSplitMuxSink
413    *
414    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
415    * The current GOP will be output to the new file.
416    *
417    * Since: 1.14
418    */
419   signals[SIGNAL_SPLIT_NOW] =
420       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
421       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
422           split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
423 
424   /**
425    * GstSplitMuxSink::split-after:
426    * @splitmux: the #GstSplitMuxSink
427    *
428    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
429    * The current GOP will be output to the old file.
430    *
431    * Since: 1.16
432    */
433   signals[SIGNAL_SPLIT_AFTER] =
434       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
435       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
436           split_after), NULL, NULL, NULL, G_TYPE_NONE, 0);
437 
438   /**
439    * GstSplitMuxSink::split-now:
440    * @splitmux: the #GstSplitMuxSink
441    *
442    * When called by the user, this action signal splits the video file (and
443    * begins a new one) as soon as the given running time is reached. If this
444    * action signal is called multiple times, running times are queued up and
445    * processed in the order they were given.
446    *
447    * Note that this is prone to race conditions, where said running time is
448    * reached and surpassed before we had a chance to split. The file will
449    * still split immediately, but in order to make sure that the split doesn't
450    * happen too late, it is recommended to call this action signal from
451    * something that will prevent further buffers from flowing into
452    * splitmuxsink before the split is completed, such as a pad probe before
453    * splitmuxsink.
454    *
455    *
456    * Since: 1.16
457    */
458   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
459       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
460       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
461           split_at_running_time), NULL, NULL, NULL, G_TYPE_NONE, 1,
462       G_TYPE_UINT64);
463 
464   /**
465    * GstSplitMuxSink::muxer-added:
466    * @splitmux: the #GstSplitMuxSink
467    * @muxer: the newly added muxer element
468    *
469    * Since: 1.14
470    */
471   signals[SIGNAL_MUXER_ADDED] =
472       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
473       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
474 
475   /**
476    * GstSplitMuxSink::sink-added:
477    * @splitmux: the #GstSplitMuxSink
478    * @sink: the newly added sink element
479    *
480    * Since: 1.14
481    */
482   signals[SIGNAL_SINK_ADDED] =
483       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
484       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
485 
486   klass->split_now = split_now;
487   klass->split_after = split_after;
488   klass->split_at_running_time = split_at_running_time;
489 }
490 
491 static void
gst_splitmux_sink_init(GstSplitMuxSink * splitmux)492 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
493 {
494   g_mutex_init (&splitmux->lock);
495   g_cond_init (&splitmux->input_cond);
496   g_cond_init (&splitmux->output_cond);
497   g_queue_init (&splitmux->out_cmd_q);
498 
499   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
500   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
501   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
502   splitmux->max_files = DEFAULT_MAX_FILES;
503   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
504   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
505   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
506   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
507   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
508 
509   splitmux->threshold_timecode_str = NULL;
510 
511   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
512   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
513   splitmux->muxer_properties = NULL;
514   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
515   splitmux->sink_properties = NULL;
516 
517   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
518   splitmux->split_requested = FALSE;
519   splitmux->do_split_next_gop = FALSE;
520   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
521 }
522 
523 static void
gst_splitmux_reset(GstSplitMuxSink * splitmux)524 gst_splitmux_reset (GstSplitMuxSink * splitmux)
525 {
526   if (splitmux->muxer) {
527     gst_element_set_locked_state (splitmux->muxer, TRUE);
528     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
529     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
530   }
531   if (splitmux->active_sink) {
532     gst_element_set_locked_state (splitmux->active_sink, TRUE);
533     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
534     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
535   }
536 
537   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
538 }
539 
540 static void
gst_splitmux_sink_dispose(GObject * object)541 gst_splitmux_sink_dispose (GObject * object)
542 {
543   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
544 
545   /* Calling parent dispose invalidates all child pointers */
546   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
547 
548   G_OBJECT_CLASS (parent_class)->dispose (object);
549 }
550 
551 static void
gst_splitmux_sink_finalize(GObject * object)552 gst_splitmux_sink_finalize (GObject * object)
553 {
554   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
555   g_cond_clear (&splitmux->input_cond);
556   g_cond_clear (&splitmux->output_cond);
557   g_mutex_clear (&splitmux->lock);
558   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
559   g_queue_clear (&splitmux->out_cmd_q);
560 
561   if (splitmux->provided_sink)
562     gst_object_unref (splitmux->provided_sink);
563   if (splitmux->provided_muxer)
564     gst_object_unref (splitmux->provided_muxer);
565 
566   if (splitmux->muxer_factory)
567     g_free (splitmux->muxer_factory);
568   if (splitmux->muxer_properties)
569     gst_structure_free (splitmux->muxer_properties);
570   if (splitmux->sink_factory)
571     g_free (splitmux->sink_factory);
572   if (splitmux->sink_properties)
573     gst_structure_free (splitmux->sink_properties);
574 
575   if (splitmux->threshold_timecode_str)
576     g_free (splitmux->threshold_timecode_str);
577 
578   if (splitmux->times_to_split)
579     gst_queue_array_free (splitmux->times_to_split);
580 
581   g_free (splitmux->location);
582 
583   /* Make sure to free any un-released contexts. There should not be any,
584    * because the dispose will have freed all request pads though */
585   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
586   g_list_free (splitmux->contexts);
587 
588   G_OBJECT_CLASS (parent_class)->finalize (object);
589 }
590 
591 /*
592  * Set any time threshold to the muxer, if it has
593  * reserved-max-duration and reserved-duration-remaining
594  * properties. Called when creating/claiming the muxer
595  * in create_elements() */
596 static void
update_muxer_properties(GstSplitMuxSink * sink)597 update_muxer_properties (GstSplitMuxSink * sink)
598 {
599   GObjectClass *klass;
600   GstClockTime threshold_time;
601 
602   sink->muxer_has_reserved_props = FALSE;
603   if (sink->muxer == NULL)
604     return;
605   klass = G_OBJECT_GET_CLASS (sink->muxer);
606   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
607     return;
608   if (g_object_class_find_property (klass,
609           "reserved-duration-remaining") == NULL)
610     return;
611   sink->muxer_has_reserved_props = TRUE;
612 
613   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
614       GST_TIME_ARGS (sink->threshold_time));
615   GST_OBJECT_LOCK (sink);
616   threshold_time = sink->threshold_time;
617   GST_OBJECT_UNLOCK (sink);
618 
619   if (threshold_time > 0) {
620     /* Tell the muxer how much space to reserve */
621     GstClockTime muxer_threshold = threshold_time;
622     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
623   }
624 }
625 
626 static void
gst_splitmux_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)627 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
628     const GValue * value, GParamSpec * pspec)
629 {
630   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
631 
632   switch (prop_id) {
633     case PROP_LOCATION:{
634       GST_OBJECT_LOCK (splitmux);
635       g_free (splitmux->location);
636       splitmux->location = g_value_dup_string (value);
637       GST_OBJECT_UNLOCK (splitmux);
638       break;
639     }
640     case PROP_MAX_SIZE_BYTES:
641       GST_OBJECT_LOCK (splitmux);
642       splitmux->threshold_bytes = g_value_get_uint64 (value);
643       GST_OBJECT_UNLOCK (splitmux);
644       break;
645     case PROP_MAX_SIZE_TIME:
646       GST_OBJECT_LOCK (splitmux);
647       splitmux->threshold_time = g_value_get_uint64 (value);
648       GST_OBJECT_UNLOCK (splitmux);
649       break;
650     case PROP_MAX_SIZE_TIMECODE:
651       GST_OBJECT_LOCK (splitmux);
652       splitmux->threshold_timecode_str = g_value_dup_string (value);
653       GST_OBJECT_UNLOCK (splitmux);
654       break;
655     case PROP_SEND_KEYFRAME_REQUESTS:
656       GST_OBJECT_LOCK (splitmux);
657       splitmux->send_keyframe_requests = g_value_get_boolean (value);
658       GST_OBJECT_UNLOCK (splitmux);
659       break;
660     case PROP_MAX_FILES:
661       GST_OBJECT_LOCK (splitmux);
662       splitmux->max_files = g_value_get_uint (value);
663       GST_OBJECT_UNLOCK (splitmux);
664       break;
665     case PROP_MUXER_OVERHEAD:
666       GST_OBJECT_LOCK (splitmux);
667       splitmux->mux_overhead = g_value_get_double (value);
668       GST_OBJECT_UNLOCK (splitmux);
669       break;
670     case PROP_USE_ROBUST_MUXING:
671       GST_OBJECT_LOCK (splitmux);
672       splitmux->use_robust_muxing = g_value_get_boolean (value);
673       GST_OBJECT_UNLOCK (splitmux);
674       if (splitmux->use_robust_muxing)
675         update_muxer_properties (splitmux);
676       break;
677     case PROP_ALIGNMENT_THRESHOLD:
678       GST_OBJECT_LOCK (splitmux);
679       splitmux->alignment_threshold = g_value_get_uint64 (value);
680       GST_OBJECT_UNLOCK (splitmux);
681       break;
682     case PROP_SINK:
683       GST_OBJECT_LOCK (splitmux);
684       if (splitmux->provided_sink)
685         gst_object_unref (splitmux->provided_sink);
686       splitmux->provided_sink = g_value_get_object (value);
687       gst_object_ref_sink (splitmux->provided_sink);
688       GST_OBJECT_UNLOCK (splitmux);
689       break;
690     case PROP_MUXER:
691       GST_OBJECT_LOCK (splitmux);
692       if (splitmux->provided_muxer)
693         gst_object_unref (splitmux->provided_muxer);
694       splitmux->provided_muxer = g_value_get_object (value);
695       gst_object_ref_sink (splitmux->provided_muxer);
696       GST_OBJECT_UNLOCK (splitmux);
697       break;
698     case PROP_RESET_MUXER:
699       GST_OBJECT_LOCK (splitmux);
700       splitmux->reset_muxer = g_value_get_boolean (value);
701       GST_OBJECT_UNLOCK (splitmux);
702       break;
703     case PROP_ASYNC_FINALIZE:
704       GST_OBJECT_LOCK (splitmux);
705       splitmux->async_finalize = g_value_get_boolean (value);
706       GST_OBJECT_UNLOCK (splitmux);
707       break;
708     case PROP_MUXER_FACTORY:
709       GST_OBJECT_LOCK (splitmux);
710       if (splitmux->muxer_factory)
711         g_free (splitmux->muxer_factory);
712       splitmux->muxer_factory = g_value_dup_string (value);
713       GST_OBJECT_UNLOCK (splitmux);
714       break;
715     case PROP_MUXER_PROPERTIES:
716       GST_OBJECT_LOCK (splitmux);
717       if (splitmux->muxer_properties)
718         gst_structure_free (splitmux->muxer_properties);
719       if (gst_value_get_structure (value))
720         splitmux->muxer_properties =
721             gst_structure_copy (gst_value_get_structure (value));
722       else
723         splitmux->muxer_properties = NULL;
724       GST_OBJECT_UNLOCK (splitmux);
725       break;
726     case PROP_SINK_FACTORY:
727       GST_OBJECT_LOCK (splitmux);
728       if (splitmux->sink_factory)
729         g_free (splitmux->sink_factory);
730       splitmux->sink_factory = g_value_dup_string (value);
731       GST_OBJECT_UNLOCK (splitmux);
732       break;
733     case PROP_SINK_PROPERTIES:
734       GST_OBJECT_LOCK (splitmux);
735       if (splitmux->sink_properties)
736         gst_structure_free (splitmux->sink_properties);
737       if (gst_value_get_structure (value))
738         splitmux->sink_properties =
739             gst_structure_copy (gst_value_get_structure (value));
740       else
741         splitmux->sink_properties = NULL;
742       GST_OBJECT_UNLOCK (splitmux);
743       break;
744     default:
745       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
746       break;
747   }
748 }
749 
750 static void
gst_splitmux_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)751 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
752     GValue * value, GParamSpec * pspec)
753 {
754   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
755 
756   switch (prop_id) {
757     case PROP_LOCATION:
758       GST_OBJECT_LOCK (splitmux);
759       g_value_set_string (value, splitmux->location);
760       GST_OBJECT_UNLOCK (splitmux);
761       break;
762     case PROP_MAX_SIZE_BYTES:
763       GST_OBJECT_LOCK (splitmux);
764       g_value_set_uint64 (value, splitmux->threshold_bytes);
765       GST_OBJECT_UNLOCK (splitmux);
766       break;
767     case PROP_MAX_SIZE_TIME:
768       GST_OBJECT_LOCK (splitmux);
769       g_value_set_uint64 (value, splitmux->threshold_time);
770       GST_OBJECT_UNLOCK (splitmux);
771       break;
772     case PROP_MAX_SIZE_TIMECODE:
773       GST_OBJECT_LOCK (splitmux);
774       g_value_set_string (value, splitmux->threshold_timecode_str);
775       GST_OBJECT_UNLOCK (splitmux);
776       break;
777     case PROP_SEND_KEYFRAME_REQUESTS:
778       GST_OBJECT_LOCK (splitmux);
779       g_value_set_boolean (value, splitmux->send_keyframe_requests);
780       GST_OBJECT_UNLOCK (splitmux);
781       break;
782     case PROP_MAX_FILES:
783       GST_OBJECT_LOCK (splitmux);
784       g_value_set_uint (value, splitmux->max_files);
785       GST_OBJECT_UNLOCK (splitmux);
786       break;
787     case PROP_MUXER_OVERHEAD:
788       GST_OBJECT_LOCK (splitmux);
789       g_value_set_double (value, splitmux->mux_overhead);
790       GST_OBJECT_UNLOCK (splitmux);
791       break;
792     case PROP_USE_ROBUST_MUXING:
793       GST_OBJECT_LOCK (splitmux);
794       g_value_set_boolean (value, splitmux->use_robust_muxing);
795       GST_OBJECT_UNLOCK (splitmux);
796       break;
797     case PROP_ALIGNMENT_THRESHOLD:
798       GST_OBJECT_LOCK (splitmux);
799       g_value_set_uint64 (value, splitmux->alignment_threshold);
800       GST_OBJECT_UNLOCK (splitmux);
801       break;
802     case PROP_SINK:
803       GST_OBJECT_LOCK (splitmux);
804       g_value_set_object (value, splitmux->provided_sink);
805       GST_OBJECT_UNLOCK (splitmux);
806       break;
807     case PROP_MUXER:
808       GST_OBJECT_LOCK (splitmux);
809       g_value_set_object (value, splitmux->provided_muxer);
810       GST_OBJECT_UNLOCK (splitmux);
811       break;
812     case PROP_RESET_MUXER:
813       GST_OBJECT_LOCK (splitmux);
814       g_value_set_boolean (value, splitmux->reset_muxer);
815       GST_OBJECT_UNLOCK (splitmux);
816       break;
817     case PROP_ASYNC_FINALIZE:
818       GST_OBJECT_LOCK (splitmux);
819       g_value_set_boolean (value, splitmux->async_finalize);
820       GST_OBJECT_UNLOCK (splitmux);
821       break;
822     case PROP_MUXER_FACTORY:
823       GST_OBJECT_LOCK (splitmux);
824       g_value_set_string (value, splitmux->muxer_factory);
825       GST_OBJECT_UNLOCK (splitmux);
826       break;
827     case PROP_MUXER_PROPERTIES:
828       GST_OBJECT_LOCK (splitmux);
829       gst_value_set_structure (value, splitmux->muxer_properties);
830       GST_OBJECT_UNLOCK (splitmux);
831       break;
832     case PROP_SINK_FACTORY:
833       GST_OBJECT_LOCK (splitmux);
834       g_value_set_string (value, splitmux->sink_factory);
835       GST_OBJECT_UNLOCK (splitmux);
836       break;
837     case PROP_SINK_PROPERTIES:
838       GST_OBJECT_LOCK (splitmux);
839       gst_value_set_structure (value, splitmux->sink_properties);
840       GST_OBJECT_UNLOCK (splitmux);
841       break;
842     default:
843       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
844       break;
845   }
846 }
847 
848 /* Convenience function */
849 static inline GstClockTimeDiff
my_segment_to_running_time(GstSegment * segment,GstClockTime val)850 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
851 {
852   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
853 
854   if (GST_CLOCK_TIME_IS_VALID (val)) {
855     gboolean sign =
856         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
857     if (sign > 0)
858       res = val;
859     else if (sign < 0)
860       res = -val;
861   }
862   return res;
863 }
864 
865 static MqStreamCtx *
mq_stream_ctx_new(GstSplitMuxSink * splitmux)866 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
867 {
868   MqStreamCtx *ctx;
869 
870   ctx = g_new0 (MqStreamCtx, 1);
871   ctx->splitmux = splitmux;
872   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
873   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
874   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
875   g_queue_init (&ctx->queued_bufs);
876   return ctx;
877 }
878 
879 static void
mq_stream_ctx_free(MqStreamCtx * ctx)880 mq_stream_ctx_free (MqStreamCtx * ctx)
881 {
882   if (ctx->q) {
883     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
884 
885     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
886 
887     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
888       gst_element_set_locked_state (ctx->q, TRUE);
889       gst_element_set_state (ctx->q, GST_STATE_NULL);
890       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
891       gst_object_unref (parent);
892     }
893     gst_object_unref (ctx->q);
894   }
895   gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
896   gst_object_unref (ctx->sinkpad);
897   gst_object_unref (ctx->srcpad);
898   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
899   g_queue_clear (&ctx->queued_bufs);
900   g_free (ctx);
901 }
902 
903 static void
send_fragment_opened_closed_msg(GstSplitMuxSink * splitmux,gboolean opened,GstElement * sink)904 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
905     GstElement * sink)
906 {
907   gchar *location = NULL;
908   GstMessage *msg;
909   const gchar *msg_name = opened ?
910       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
911   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
912 
913   if (!opened) {
914     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
915     if (rtime)
916       running_time = *rtime;
917   }
918 
919   g_object_get (sink, "location", &location, NULL);
920 
921   /* If it's in the middle of a teardown, the reference_ctc might have become
922    * NULL */
923   if (splitmux->reference_ctx) {
924     msg = gst_message_new_element (GST_OBJECT (splitmux),
925         gst_structure_new (msg_name,
926             "location", G_TYPE_STRING, location,
927             "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
928     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
929   }
930 
931   g_free (location);
932 }
933 
934 static void
send_eos_async(GstSplitMuxSink * splitmux,AsyncEosHelper * helper)935 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
936 {
937   GstEvent *eos;
938   GstPad *pad;
939   MqStreamCtx *ctx;
940 
941   eos = gst_event_new_eos ();
942   pad = helper->pad;
943   ctx = helper->ctx;
944 
945   GST_SPLITMUX_LOCK (splitmux);
946   if (!pad)
947     pad = gst_pad_get_peer (ctx->srcpad);
948   GST_SPLITMUX_UNLOCK (splitmux);
949 
950   gst_pad_send_event (pad, eos);
951   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
952 
953   gst_object_unref (pad);
954   g_free (helper);
955 }
956 
957 /* Called with lock held, drops the lock to send EOS to the
958  * pad
959  */
960 static void
send_eos(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)961 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
962 {
963   GstEvent *eos;
964   GstPad *pad;
965 
966   eos = gst_event_new_eos ();
967   pad = gst_pad_get_peer (ctx->srcpad);
968 
969   ctx->out_eos = TRUE;
970 
971   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
972   GST_SPLITMUX_UNLOCK (splitmux);
973   gst_pad_send_event (pad, eos);
974   GST_SPLITMUX_LOCK (splitmux);
975 
976   gst_object_unref (pad);
977 }
978 
979 /* Called with lock held. Schedules an EOS event to the ctx pad
980  * to happen in another thread */
981 static void
eos_context_async(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)982 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
983 {
984   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
985   GstPad *srcpad, *sinkpad;
986 
987   srcpad = ctx->srcpad;
988   sinkpad = gst_pad_get_peer (srcpad);
989 
990   helper->ctx = ctx;
991   helper->pad = sinkpad;        /* Takes the reference */
992 
993   ctx->out_eos_async_done = TRUE;
994   /* HACK: Here, we explicitly unset the SINK flag on the target sink element
995    * that's about to be asynchronously disposed, so that it no longer
996    * participates in GstBin EOS logic. This fixes a race where if
997    * splitmuxsink really reaches EOS before an asynchronous background
998    * element has finished, then the bin won't actually send EOS to the
999    * pipeline. Even after finishing and removing the old element, the
1000    * bin doesn't re-check EOS status on removing a SINK element. This
1001    * should be fixed in core, making this hack unnecessary. */
1002   GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
1003 
1004   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1005       sinkpad, ctx);
1006 
1007   g_assert_nonnull (helper->pad);
1008   gst_element_call_async (GST_ELEMENT (splitmux),
1009       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1010 }
1011 
1012 /* Called with lock held. TRUE iff all contexts have a
1013  * pending (or delivered) async eos event */
1014 static gboolean
all_contexts_are_async_eos(GstSplitMuxSink * splitmux)1015 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1016 {
1017   gboolean ret = TRUE;
1018   GList *item;
1019 
1020   for (item = splitmux->contexts; item; item = item->next) {
1021     MqStreamCtx *ctx = item->data;
1022     ret &= ctx->out_eos_async_done;
1023   }
1024   return ret;
1025 }
1026 
1027 /* Called with splitmux lock held to check if this output
1028  * context needs to sleep to wait for the release of the
1029  * next GOP, or to send EOS to close out the current file
1030  */
1031 static void
complete_or_wait_on_out(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1032 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1033 {
1034   if (ctx->caps_change)
1035     return;
1036 
1037   do {
1038     /* When first starting up, the reference stream has to output
1039      * the first buffer to prepare the muxer and sink */
1040     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1041     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1042 
1043     if (!(splitmux->max_out_running_time == 0 ||
1044             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1045             splitmux->alignment_threshold == 0 ||
1046             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1047       my_max_out_running_time -= splitmux->alignment_threshold;
1048       GST_LOG_OBJECT (ctx->srcpad,
1049           "Max out running time currently %" GST_STIME_FORMAT
1050           ", with threshold applied it is %" GST_STIME_FORMAT,
1051           GST_STIME_ARGS (splitmux->max_out_running_time),
1052           GST_STIME_ARGS (my_max_out_running_time));
1053     }
1054 
1055     if (ctx->flushing
1056         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1057       return;
1058 
1059     GST_LOG_OBJECT (ctx->srcpad,
1060         "Checking running time %" GST_STIME_FORMAT " against max %"
1061         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1062         GST_STIME_ARGS (my_max_out_running_time));
1063 
1064     if (can_output) {
1065       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1066           ctx->out_running_time < my_max_out_running_time) {
1067         return;
1068       }
1069 
1070       switch (splitmux->output_state) {
1071         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1072           /* We only get here if we've finished outputting a GOP and need to know
1073            * what to do next */
1074           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1075           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1076           continue;
1077 
1078         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1079           /* We've reached the max out running_time to get here, so end this file now */
1080           if (ctx->out_eos == FALSE) {
1081             if (splitmux->async_finalize) {
1082               /* We must set EOS asynchronously at this point. We cannot defer
1083                * it, because we need all contexts to wake up, for the
1084                * reference context to eventually give us something at
1085                * START_NEXT_FILE. Otherwise, collectpads might choose another
1086                * context to give us the first buffer, and format-location-full
1087                * will not contain a valid sample. */
1088               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1089                   GINT_TO_POINTER (1));
1090               eos_context_async (ctx, splitmux);
1091               if (all_contexts_are_async_eos (splitmux)) {
1092                 GST_INFO_OBJECT (splitmux,
1093                     "All contexts are async_eos. Moving to the next file.");
1094                 /* We can start the next file once we've asked each pad to go EOS */
1095                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1096                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1097                 continue;
1098               }
1099             } else {
1100               send_eos (splitmux, ctx);
1101               continue;
1102             }
1103           } else {
1104             GST_INFO_OBJECT (splitmux,
1105                 "At end-of-file state, but context %p is already EOS", ctx);
1106           }
1107           break;
1108         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1109           if (ctx->is_reference) {
1110             /* Special handling on the reference ctx to start new fragments
1111              * and collect commands from the command queue */
1112             /* drops the splitmux lock briefly: */
1113             /* We must have reference ctx in order for format-location-full to
1114              * have a sample */
1115             start_next_fragment (splitmux, ctx);
1116             continue;
1117           }
1118           break;
1119         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1120           do {
1121             SplitMuxOutputCommand *cmd =
1122                 g_queue_pop_tail (&splitmux->out_cmd_q);
1123             if (cmd != NULL) {
1124               /* If we pop the last command, we need to make our queues bigger */
1125               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1126                 grow_blocked_queues (splitmux);
1127 
1128               if (cmd->start_new_fragment) {
1129                 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1130                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1131               } else {
1132                 GST_DEBUG_OBJECT (splitmux,
1133                     "Got new output cmd for time %" GST_STIME_FORMAT,
1134                     GST_STIME_ARGS (cmd->max_output_ts));
1135 
1136                 /* Extend the output range immediately */
1137                 splitmux->max_out_running_time = cmd->max_output_ts;
1138                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1139               }
1140               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1141 
1142               out_cmd_buf_free (cmd);
1143               break;
1144             } else {
1145               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1146 /* ohos.ext.func.0008
1147  * when GST_EVENT_FLUSH_START, ctx->flushing is set to TRUE.
1148  * GST_SPLITMUX_BROADCAST_OUTPUT(splitmux) was called.
1149  * this loop must break when fushing is set.
1150  */
1151 #ifdef OHOS_EXT_FUNC
1152               if (ctx->flushing) {
1153                 GST_DEBUG_OBJECT (ctx->srcpad, "splitmuxsink is flushing, break.");
1154                 break;
1155               }
1156 #endif
1157             }
1158           } while (splitmux->output_state ==
1159               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1160           /* loop and re-check the state */
1161           continue;
1162         }
1163         case SPLITMUX_OUTPUT_STATE_STOPPED:
1164           return;
1165       }
1166     }
1167 
1168     GST_INFO_OBJECT (ctx->srcpad,
1169         "Sleeping for running time %"
1170         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1171         GST_STIME_ARGS (ctx->out_running_time),
1172         GST_STIME_ARGS (splitmux->max_out_running_time));
1173     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1174     GST_INFO_OBJECT (ctx->srcpad,
1175         "Woken for new max running time %" GST_STIME_FORMAT,
1176         GST_STIME_ARGS (splitmux->max_out_running_time));
1177   }
1178   while (1);
1179 }
1180 
1181 static GstClockTime
calculate_next_max_timecode(GstSplitMuxSink * splitmux,const GstVideoTimeCode * cur_tc)1182 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1183     const GstVideoTimeCode * cur_tc)
1184 {
1185   GstVideoTimeCode *target_tc;
1186   GstVideoTimeCodeInterval *tc_inter;
1187   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1188 
1189   if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1190     return GST_CLOCK_TIME_NONE;
1191 
1192   tc_inter =
1193       gst_video_time_code_interval_new_from_string
1194       (splitmux->threshold_timecode_str);
1195   target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1196   gst_video_time_code_interval_free (tc_inter);
1197 
1198   /* Convert to ns */
1199   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1200   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1201 
1202   /* Add fragment_start_time, accounting for wraparound */
1203   if (target_tc_time >= cur_tc_time) {
1204     next_max_tc_time =
1205         target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1206   } else {
1207     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1208 
1209     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1210         (cur_tc->config.fps_d == 1001)) {
1211       /* Checking fps_d is probably unneeded, but better safe than sorry
1212        * (e.g. someone accidentally set a flag) */
1213       GstVideoTimeCode *tc_for_offset;
1214 
1215       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1216        * but slightly less. Calculate that duration from a fake timecode. The
1217        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1218        * is to add one frame to 23:59:59;29 */
1219       tc_for_offset =
1220           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1221           NULL, cur_tc->config.flags, 23, 59, 59,
1222           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1223       day_in_ns =
1224           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1225           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1226           cur_tc->config.fps_n);
1227       gst_video_time_code_free (tc_for_offset);
1228     }
1229     next_max_tc_time =
1230         day_in_ns - cur_tc_time + target_tc_time +
1231         splitmux->fragment_start_time;
1232   }
1233 
1234   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1235       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1236       GST_TIME_ARGS (cur_tc_time));
1237   gst_video_time_code_free (target_tc);
1238 
1239   return next_max_tc_time;
1240 }
1241 
1242 static gboolean
request_next_keyframe(GstSplitMuxSink * splitmux,GstBuffer * buffer)1243 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1244 {
1245   GstEvent *ev;
1246   GstClockTime target_time;
1247   gboolean timecode_based = FALSE;
1248 
1249   splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1250   if (splitmux->threshold_timecode_str) {
1251     GstVideoTimeCodeMeta *tc_meta;
1252 
1253     if (buffer != NULL) {
1254       tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1255       if (tc_meta) {
1256         splitmux->next_max_tc_time =
1257             calculate_next_max_timecode (splitmux, &tc_meta->tc);
1258         timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1259       }
1260     } else {
1261       /* This can happen in the presence of GAP events that trigger
1262        * a new fragment start */
1263       GST_WARNING_OBJECT (splitmux,
1264           "No buffer available to calculate next timecode");
1265     }
1266   }
1267 
1268   if (splitmux->send_keyframe_requests == FALSE
1269       || (splitmux->threshold_time == 0 && !timecode_based)
1270       || splitmux->threshold_bytes != 0)
1271     return TRUE;
1272 
1273   if (timecode_based) {
1274     /* We might have rounding errors: aim slightly earlier */
1275     target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1276   } else {
1277     target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1278   }
1279   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1280   GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1281       GST_TIME_ARGS (target_time));
1282   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1283 }
1284 
1285 static GstPadProbeReturn
handle_mq_output(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)1286 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1287 {
1288   GstSplitMuxSink *splitmux = ctx->splitmux;
1289   MqStreamBuf *buf_info = NULL;
1290 
1291   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1292 
1293   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1294   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1295     g_warning ("Buffer list handling not implemented");
1296     return GST_PAD_PROBE_DROP;
1297   }
1298   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1299       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1300     GstEvent *event = gst_pad_probe_info_get_event (info);
1301     gboolean locked = FALSE;
1302 
1303     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1304 
1305     switch (GST_EVENT_TYPE (event)) {
1306       case GST_EVENT_SEGMENT:
1307         gst_event_copy_segment (event, &ctx->out_segment);
1308         break;
1309       case GST_EVENT_FLUSH_STOP:
1310         GST_SPLITMUX_LOCK (splitmux);
1311         locked = TRUE;
1312         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1313         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1314         g_queue_clear (&ctx->queued_bufs);
1315         ctx->flushing = FALSE;
1316         break;
1317       case GST_EVENT_FLUSH_START:
1318         GST_SPLITMUX_LOCK (splitmux);
1319         locked = TRUE;
1320         GST_LOG_OBJECT (pad, "Flush start");
1321         ctx->flushing = TRUE;
1322         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1323         break;
1324       case GST_EVENT_EOS:
1325         GST_SPLITMUX_LOCK (splitmux);
1326         locked = TRUE;
1327         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1328           goto beach;
1329         ctx->out_eos = TRUE;
1330         GST_INFO_OBJECT (splitmux,
1331             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1332         break;
1333       case GST_EVENT_GAP:{
1334         GstClockTime gap_ts;
1335         GstClockTimeDiff rtime;
1336 
1337         gst_event_parse_gap (event, &gap_ts, NULL);
1338         if (gap_ts == GST_CLOCK_TIME_NONE)
1339           break;
1340 
1341         GST_SPLITMUX_LOCK (splitmux);
1342         locked = TRUE;
1343 
1344         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1345           goto beach;
1346 
1347         /* When we get a gap event on the
1348          * reference stream and we're trying to open a
1349          * new file, we need to store it until we get
1350          * the buffer afterwards
1351          */
1352         if (ctx->is_reference &&
1353             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1354           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1355           gst_event_replace (&ctx->pending_gap, event);
1356           GST_SPLITMUX_UNLOCK (splitmux);
1357           return GST_PAD_PROBE_HANDLED;
1358         }
1359 
1360         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1361 
1362         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1363             GST_STIME_ARGS (rtime));
1364 
1365         if (rtime != GST_CLOCK_STIME_NONE) {
1366           ctx->out_running_time = rtime;
1367           complete_or_wait_on_out (splitmux, ctx);
1368         }
1369         break;
1370       }
1371       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1372         const GstStructure *s;
1373         GstClockTimeDiff ts = 0;
1374 
1375         s = gst_event_get_structure (event);
1376         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1377           break;
1378 
1379         gst_structure_get_int64 (s, "timestamp", &ts);
1380 
1381         GST_SPLITMUX_LOCK (splitmux);
1382         locked = TRUE;
1383 
1384         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1385           goto beach;
1386         ctx->out_running_time = ts;
1387         if (!ctx->is_reference)
1388           complete_or_wait_on_out (splitmux, ctx);
1389         GST_SPLITMUX_UNLOCK (splitmux);
1390         return GST_PAD_PROBE_DROP;
1391       }
1392       case GST_EVENT_CAPS:{
1393         GstPad *peer;
1394 
1395         if (!ctx->is_reference)
1396           break;
1397 
1398         peer = gst_pad_get_peer (pad);
1399         if (peer) {
1400           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1401 
1402           gst_object_unref (peer);
1403 
1404           if (ok)
1405             break;
1406 
1407         } else {
1408           break;
1409         }
1410         /* This is in the case the muxer doesn't allow this change of caps */
1411         GST_SPLITMUX_LOCK (splitmux);
1412         locked = TRUE;
1413         ctx->caps_change = TRUE;
1414 
1415         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1416           GST_DEBUG_OBJECT (splitmux,
1417               "New caps were not accepted. Switching output file");
1418           if (ctx->out_eos == FALSE) {
1419             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1420             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1421           }
1422         }
1423 
1424         /* Lets it fall through, if it fails again, then the muxer just can't
1425          * support this format, but at least we have a closed file.
1426          */
1427         break;
1428       }
1429       default:
1430         break;
1431     }
1432 
1433     /* We need to make sure events aren't passed
1434      * until the muxer / sink are ready for it */
1435     if (!locked)
1436       GST_SPLITMUX_LOCK (splitmux);
1437 /* ohos.ext.func.0008
1438  * when GST_EVENT_FLUSH_STOP, there is a very low probability freeze.
1439  * FLUSH_START and FLUSH_STOP should not wait for cmd in complete_or_wait_on_out.
1440  * cause upstream component will be block if these two EVENT wait in this function.
1441  */
1442 #ifdef OHOS_EXT_FUNC
1443     if (!ctx->is_reference && (GST_EVENT_TYPE(event) != GST_EVENT_FLUSH_START) &&
1444       (GST_EVENT_TYPE(event) != GST_EVENT_FLUSH_STOP))
1445 #else
1446     if (!ctx->is_reference)
1447 #endif
1448       complete_or_wait_on_out (splitmux, ctx);
1449     GST_SPLITMUX_UNLOCK (splitmux);
1450 
1451     /* Don't try to forward sticky events before the next buffer is there
1452      * because it would cause a new file to be created without the first
1453      * buffer being available.
1454      */
1455     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1456       gst_event_unref (event);
1457       return GST_PAD_PROBE_HANDLED;
1458     } else
1459       return GST_PAD_PROBE_PASS;
1460   }
1461 
1462   /* Allow everything through until the configured next stopping point */
1463   GST_SPLITMUX_LOCK (splitmux);
1464 
1465   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1466   if (buf_info == NULL)
1467     /* Can only happen due to a poorly timed flush */
1468     goto beach;
1469 
1470   /* If we have popped a keyframe, decrement the queued_gop count */
1471   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1472     splitmux->queued_keyframes--;
1473 
1474   ctx->out_running_time = buf_info->run_ts;
1475   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1476 
1477   GST_LOG_OBJECT (splitmux,
1478       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1479       " size %" G_GUINT64_FORMAT,
1480       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1481 
1482   ctx->caps_change = FALSE;
1483 
1484   complete_or_wait_on_out (splitmux, ctx);
1485 
1486   splitmux->muxed_out_bytes += buf_info->buf_size;
1487 
1488 #ifndef GST_DISABLE_GST_DEBUG
1489   {
1490     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1491     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1492         " run ts %" GST_STIME_FORMAT, buf,
1493         GST_STIME_ARGS (ctx->out_running_time));
1494   }
1495 #endif
1496 
1497   ctx->cur_out_buffer = NULL;
1498   GST_SPLITMUX_UNLOCK (splitmux);
1499 
1500   /* pending_gap is protected by the STREAM lock */
1501   if (ctx->pending_gap) {
1502     /* If we previously stored a gap event, send it now */
1503     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1504 
1505     GST_DEBUG_OBJECT (splitmux,
1506         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1507 
1508     gst_pad_send_event (peer, ctx->pending_gap);
1509     ctx->pending_gap = NULL;
1510 
1511     gst_object_unref (peer);
1512   }
1513 
1514   mq_stream_buf_free (buf_info);
1515 
1516   return GST_PAD_PROBE_PASS;
1517 
1518 beach:
1519   GST_SPLITMUX_UNLOCK (splitmux);
1520   return GST_PAD_PROBE_DROP;
1521 }
1522 
1523 static gboolean
resend_sticky(GstPad * pad,GstEvent ** event,GstPad * peer)1524 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1525 {
1526   return gst_pad_send_event (peer, gst_event_ref (*event));
1527 }
1528 
1529 static void
unlock_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1530 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1531 {
1532   if (ctx->fragment_block_id > 0) {
1533     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1534     ctx->fragment_block_id = 0;
1535   }
1536 }
1537 
1538 static void
restart_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1539 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1540 {
1541   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1542 
1543   gst_pad_sticky_events_foreach (ctx->srcpad,
1544       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1545 
1546   /* Clear EOS flag if not actually EOS */
1547   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1548   ctx->out_eos_async_done = ctx->out_eos;
1549 
1550   gst_object_unref (peer);
1551 }
1552 
1553 static void
relink_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1554 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1555 {
1556   GstPad *sinkpad, *srcpad, *newpad;
1557   GstPadTemplate *templ;
1558 
1559   srcpad = ctx->srcpad;
1560   sinkpad = gst_pad_get_peer (srcpad);
1561 
1562   templ = sinkpad->padtemplate;
1563   newpad =
1564       gst_element_request_pad (splitmux->muxer, templ,
1565       GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1566 
1567   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1568       newpad);
1569   if (!gst_pad_unlink (srcpad, sinkpad)) {
1570     gst_object_unref (sinkpad);
1571     goto fail;
1572   }
1573   if (gst_pad_link_full (srcpad, newpad,
1574           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1575     gst_element_release_request_pad (splitmux->muxer, newpad);
1576     gst_object_unref (sinkpad);
1577     gst_object_unref (newpad);
1578     goto fail;
1579   }
1580   gst_object_unref (newpad);
1581   gst_object_unref (sinkpad);
1582   return;
1583 
1584 fail:
1585   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1586       ("Could not create the new muxer/sink"), NULL);
1587 }
1588 
1589 static GstPadProbeReturn
_block_pad(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1590 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1591 {
1592   return GST_PAD_PROBE_OK;
1593 }
1594 
1595 static void
block_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1596 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1597 {
1598   ctx->fragment_block_id =
1599       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1600       NULL, NULL);
1601 }
1602 
1603 static gboolean
_set_property_from_structure(GQuark field_id,const GValue * value,gpointer user_data)1604 _set_property_from_structure (GQuark field_id, const GValue * value,
1605     gpointer user_data)
1606 {
1607   const gchar *property_name = g_quark_to_string (field_id);
1608   GObject *element = G_OBJECT (user_data);
1609 
1610   g_object_set_property (element, property_name, value);
1611 
1612   return TRUE;
1613 }
1614 
1615 static void
_lock_and_set_to_null(GstElement * element,GstSplitMuxSink * splitmux)1616 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1617 {
1618   gst_element_set_locked_state (element, TRUE);
1619   gst_element_set_state (element, GST_STATE_NULL);
1620   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1621   gst_bin_remove (GST_BIN (splitmux), element);
1622 }
1623 
1624 
1625 static void
_send_event(const GValue * value,gpointer user_data)1626 _send_event (const GValue * value, gpointer user_data)
1627 {
1628   GstPad *pad = g_value_get_object (value);
1629   GstEvent *ev = user_data;
1630 
1631   gst_pad_send_event (pad, gst_event_ref (ev));
1632 }
1633 
1634 /* Called with lock held when a fragment
1635  * reaches EOS and it is time to restart
1636  * a new fragment
1637  */
1638 static void
start_next_fragment(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1639 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1640 {
1641   GstElement *muxer, *sink;
1642 
1643   g_assert (ctx->is_reference);
1644 
1645   /* 1 change to new file */
1646   splitmux->switching_fragment = TRUE;
1647 
1648   /* We need to drop the splitmux lock to acquire the state lock
1649    * here and ensure there's no racy state change going on elsewhere */
1650   muxer = gst_object_ref (splitmux->muxer);
1651   sink = gst_object_ref (splitmux->active_sink);
1652 
1653   GST_SPLITMUX_UNLOCK (splitmux);
1654   GST_STATE_LOCK (splitmux);
1655 
1656   if (splitmux->async_finalize) {
1657     if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1658       gchar *newname;
1659       GstElement *new_sink, *new_muxer;
1660 
1661       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1662           splitmux->fragment_id);
1663       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1664       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1665       GST_SPLITMUX_LOCK (splitmux);
1666       if ((splitmux->sink =
1667               create_element (splitmux, splitmux->sink_factory, newname,
1668                   TRUE)) == NULL)
1669         goto fail;
1670       if (splitmux->sink_properties)
1671         gst_structure_foreach (splitmux->sink_properties,
1672             _set_property_from_structure, splitmux->sink);
1673       splitmux->active_sink = splitmux->sink;
1674       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1675       g_free (newname);
1676       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1677       if ((splitmux->muxer =
1678               create_element (splitmux, splitmux->muxer_factory, newname,
1679                   TRUE)) == NULL)
1680         goto fail;
1681       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1682               "async") != NULL) {
1683         /* async child elements are causing state change races and weird
1684          * failures, so let's try and turn that off */
1685         g_object_set (splitmux->sink, "async", FALSE, NULL);
1686       }
1687       if (splitmux->muxer_properties)
1688         gst_structure_foreach (splitmux->muxer_properties,
1689             _set_property_from_structure, splitmux->muxer);
1690       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1691       g_free (newname);
1692       new_sink = splitmux->sink;
1693       new_muxer = splitmux->muxer;
1694       GST_SPLITMUX_UNLOCK (splitmux);
1695       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1696       gst_element_link (new_muxer, new_sink);
1697 
1698       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1699         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1700                     EOS_FROM_US)) == 2) {
1701           _lock_and_set_to_null (muxer, splitmux);
1702           _lock_and_set_to_null (sink, splitmux);
1703         } else {
1704           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1705               GINT_TO_POINTER (2));
1706         }
1707       }
1708       gst_object_unref (muxer);
1709       gst_object_unref (sink);
1710       muxer = new_muxer;
1711       sink = new_sink;
1712       gst_object_ref (muxer);
1713       gst_object_ref (sink);
1714     }
1715   } else {
1716 
1717     gst_element_set_locked_state (muxer, TRUE);
1718     gst_element_set_locked_state (sink, TRUE);
1719     gst_element_set_state (sink, GST_STATE_NULL);
1720 
1721     if (splitmux->reset_muxer) {
1722       gst_element_set_state (muxer, GST_STATE_NULL);
1723     } else {
1724       GstIterator *it = gst_element_iterate_sink_pads (muxer);
1725       GstEvent *ev;
1726 
1727       ev = gst_event_new_flush_start ();
1728       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1729       gst_event_unref (ev);
1730 
1731       gst_iterator_resync (it);
1732 
1733       ev = gst_event_new_flush_stop (TRUE);
1734       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1735       gst_event_unref (ev);
1736 
1737       gst_iterator_free (it);
1738     }
1739   }
1740 
1741   GST_SPLITMUX_LOCK (splitmux);
1742   if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1743     set_next_filename (splitmux, ctx);
1744   splitmux->muxed_out_bytes = 0;
1745   GST_SPLITMUX_UNLOCK (splitmux);
1746 
1747   gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1748   gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1749   gst_element_set_locked_state (muxer, FALSE);
1750   gst_element_set_locked_state (sink, FALSE);
1751 
1752   gst_object_unref (sink);
1753   gst_object_unref (muxer);
1754 
1755   GST_SPLITMUX_LOCK (splitmux);
1756   GST_STATE_UNLOCK (splitmux);
1757   splitmux->switching_fragment = FALSE;
1758   do_async_done (splitmux);
1759 
1760   splitmux->ready_for_output = TRUE;
1761 
1762   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1763   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1764 
1765   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1766 
1767   /* FIXME: Is this always the correct next state? */
1768   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1769   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1770   return;
1771 
1772 fail:
1773   GST_STATE_UNLOCK (splitmux);
1774   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1775       ("Could not create the new muxer/sink"), NULL);
1776 }
1777 
1778 static void
bus_handler(GstBin * bin,GstMessage * message)1779 bus_handler (GstBin * bin, GstMessage * message)
1780 {
1781   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1782 
1783   switch (GST_MESSAGE_TYPE (message)) {
1784     case GST_MESSAGE_EOS:{
1785       /* If the state is draining out the current file, drop this EOS */
1786       GstElement *sink;
1787 
1788       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1789       GST_SPLITMUX_LOCK (splitmux);
1790 
1791       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1792 
1793       if (splitmux->async_finalize) {
1794 
1795         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1796           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1797                       EOS_FROM_US)) == 2) {
1798             GstElement *muxer;
1799             GstPad *sinksink, *muxersrc;
1800 
1801             sinksink = gst_element_get_static_pad (sink, "sink");
1802             muxersrc = gst_pad_get_peer (sinksink);
1803             muxer = gst_pad_get_parent_element (muxersrc);
1804             gst_object_unref (sinksink);
1805             gst_object_unref (muxersrc);
1806 
1807             gst_element_call_async (muxer,
1808                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1809                 gst_object_ref (splitmux), gst_object_unref);
1810             gst_element_call_async (sink,
1811                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1812                 gst_object_ref (splitmux), gst_object_unref);
1813             gst_object_unref (muxer);
1814           } else {
1815             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1816                 GINT_TO_POINTER (2));
1817           }
1818           GST_DEBUG_OBJECT (splitmux,
1819               "Caught async EOS from previous muxer+sink. Dropping.");
1820           /* We forward the EOS so that it gets aggregated as normal. If the sink
1821            * finishes and is removed before the end, it will be de-aggregated */
1822           gst_message_unref (message);
1823           GST_SPLITMUX_UNLOCK (splitmux);
1824           return;
1825         }
1826       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1827         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1828         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1829         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1830 
1831         gst_message_unref (message);
1832         GST_SPLITMUX_UNLOCK (splitmux);
1833         return;
1834       } else {
1835         GST_DEBUG_OBJECT (splitmux,
1836             "Passing EOS message. Output state %d max_out_running_time %"
1837             GST_STIME_FORMAT, splitmux->output_state,
1838             GST_STIME_ARGS (splitmux->max_out_running_time));
1839       }
1840       GST_SPLITMUX_UNLOCK (splitmux);
1841       break;
1842     }
1843     case GST_MESSAGE_ASYNC_START:
1844     case GST_MESSAGE_ASYNC_DONE:
1845       /* Ignore state changes from our children while switching */
1846       GST_SPLITMUX_LOCK (splitmux);
1847       if (splitmux->switching_fragment) {
1848         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1849             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1850           GST_LOG_OBJECT (splitmux,
1851               "Ignoring state change from child %" GST_PTR_FORMAT
1852               " while switching", GST_MESSAGE_SRC (message));
1853           gst_message_unref (message);
1854           GST_SPLITMUX_UNLOCK (splitmux);
1855           return;
1856         }
1857       }
1858       GST_SPLITMUX_UNLOCK (splitmux);
1859       break;
1860     case GST_MESSAGE_WARNING:
1861     {
1862       GError *gerror = NULL;
1863 
1864       gst_message_parse_warning (message, &gerror, NULL);
1865 
1866       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1867         GList *item;
1868         gboolean caps_change = FALSE;
1869 
1870         GST_SPLITMUX_LOCK (splitmux);
1871 
1872         for (item = splitmux->contexts; item; item = item->next) {
1873           MqStreamCtx *ctx = item->data;
1874 
1875           if (ctx->caps_change) {
1876             caps_change = TRUE;
1877             break;
1878           }
1879         }
1880 
1881         GST_SPLITMUX_UNLOCK (splitmux);
1882 
1883         if (caps_change) {
1884           GST_LOG_OBJECT (splitmux,
1885               "Ignoring warning change from child %" GST_PTR_FORMAT
1886               " while switching caps", GST_MESSAGE_SRC (message));
1887           gst_message_unref (message);
1888           return;
1889         }
1890       }
1891       break;
1892     }
1893     default:
1894       break;
1895   }
1896 
1897   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1898 }
1899 
1900 static void
ctx_set_unblock(MqStreamCtx * ctx)1901 ctx_set_unblock (MqStreamCtx * ctx)
1902 {
1903   ctx->need_unblock = TRUE;
1904 }
1905 
1906 static gboolean
need_new_fragment(GstSplitMuxSink * splitmux,GstClockTime queued_time,GstClockTime queued_gop_time,guint64 queued_bytes)1907 need_new_fragment (GstSplitMuxSink * splitmux,
1908     GstClockTime queued_time, GstClockTime queued_gop_time,
1909     guint64 queued_bytes)
1910 {
1911   guint64 thresh_bytes;
1912   GstClockTime thresh_time;
1913   gboolean check_robust_muxing;
1914   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
1915   GstClockTime *ptr_to_time;
1916 
1917   GST_OBJECT_LOCK (splitmux);
1918   thresh_bytes = splitmux->threshold_bytes;
1919   thresh_time = splitmux->threshold_time;
1920   ptr_to_time = (GstClockTime *)
1921       gst_queue_array_peek_head_struct (splitmux->times_to_split);
1922   if (ptr_to_time)
1923     time_to_split = *ptr_to_time;
1924   check_robust_muxing = splitmux->use_robust_muxing
1925       && splitmux->muxer_has_reserved_props;
1926   GST_OBJECT_UNLOCK (splitmux);
1927 
1928   /* Have we muxed anything into the new file at all? */
1929   if (splitmux->fragment_total_bytes <= 0)
1930     return FALSE;
1931 
1932   /* User told us to split now */
1933   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE)
1934     return TRUE;
1935 
1936   /* User told us to split at this running time */
1937   if (splitmux->reference_ctx->in_running_time > time_to_split) {
1938     GST_OBJECT_LOCK (splitmux);
1939     /* Dequeue running time */
1940     gst_queue_array_pop_head_struct (splitmux->times_to_split);
1941     /* Empty any running times after this that are past now */
1942     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1943     while (ptr_to_time) {
1944       time_to_split = *ptr_to_time;
1945       if (splitmux->reference_ctx->in_running_time <= time_to_split) {
1946         break;
1947       }
1948       gst_queue_array_pop_head_struct (splitmux->times_to_split);
1949       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1950     }
1951     GST_OBJECT_UNLOCK (splitmux);
1952     return TRUE;
1953   }
1954 
1955   if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1956     return TRUE;                /* Would overrun byte limit */
1957 
1958   if (thresh_time > 0 && queued_time > thresh_time)
1959     return TRUE;                /* Would overrun byte limit */
1960 
1961   /* Timecode-based threshold accounts for possible rounding errors:
1962    * 5us should be bigger than all possible rounding errors but nowhere near
1963    * big enough to skip to another frame */
1964   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1965       splitmux->reference_ctx->in_running_time >
1966       splitmux->next_max_tc_time + 5 * GST_USECOND)
1967     return TRUE;                /* Timecode threshold */
1968 
1969   if (check_robust_muxing) {
1970     GstClockTime mux_reserved_remain;
1971 
1972     g_object_get (splitmux->muxer,
1973         "reserved-duration-remaining", &mux_reserved_remain, NULL);
1974 
1975     GST_LOG_OBJECT (splitmux,
1976         "Muxer robust muxing report - %" G_GUINT64_FORMAT
1977         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1978         mux_reserved_remain, queued_gop_time);
1979 
1980     if (queued_gop_time >= mux_reserved_remain) {
1981       GST_INFO_OBJECT (splitmux,
1982           "File is about to run out of header room - %" G_GUINT64_FORMAT
1983           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1984           ". Switching to new file", mux_reserved_remain, queued_gop_time);
1985       return TRUE;
1986     }
1987   }
1988 
1989   /* Continue and mux this GOP */
1990   return FALSE;
1991 }
1992 
1993 /* Called with splitmux lock held */
1994 /* Called when entering ProcessingCompleteGop state
1995  * Assess if mq contents overflowed the current file
1996  *   -> If yes, need to switch to new file
1997  *   -> if no, set max_out_running_time to let this GOP in and
1998  *      go to COLLECTING_GOP_START state
1999  */
2000 static void
handle_gathered_gop(GstSplitMuxSink * splitmux)2001 handle_gathered_gop (GstSplitMuxSink * splitmux)
2002 {
2003   guint64 queued_bytes;
2004   GstClockTimeDiff queued_time = 0;
2005   GstClockTimeDiff queued_gop_time = 0;
2006   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
2007   SplitMuxOutputCommand *cmd;
2008 
2009   /* Assess if the multiqueue contents overflowed the current file */
2010   /* When considering if a newly gathered GOP overflows
2011    * the time limit for the file, only consider the running time of the
2012    * reference stream. Other streams might have run ahead a little bit,
2013    * but extra pieces won't be released to the muxer beyond the reference
2014    * stream cut-off anyway - so it forms the limit. */
2015   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
2016   queued_time = splitmux->reference_ctx->in_running_time;
2017   /* queued_gop_time tracks how much unwritten data there is waiting to
2018    * be written to this fragment including this GOP */
2019   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2020     queued_gop_time =
2021         splitmux->reference_ctx->in_running_time -
2022         splitmux->reference_ctx->out_running_time;
2023   else
2024     queued_gop_time =
2025         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2026 
2027   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2028 
2029   g_assert (queued_gop_time >= 0);
2030   g_assert (queued_time >= splitmux->fragment_start_time);
2031 
2032   queued_time -= splitmux->fragment_start_time;
2033   if (queued_time < queued_gop_time)
2034     queued_gop_time = queued_time;
2035 
2036   /* Expand queued bytes estimate by muxer overhead */
2037   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2038 
2039   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2040       " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
2041   if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
2042     GST_LOG_OBJECT (splitmux,
2043         "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
2044         GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
2045         GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
2046   }
2047 
2048   /* Check for overrun - have we output at least one byte and overrun
2049    * either threshold? */
2050   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2051     if (splitmux->async_finalize) {
2052       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2053       *sink_running_time = splitmux->reference_ctx->out_running_time;
2054       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2055           RUNNING_TIME, sink_running_time, g_free);
2056     }
2057     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2058     /* Tell the output side to start a new fragment */
2059     GST_INFO_OBJECT (splitmux,
2060         "This GOP (dur %" GST_STIME_FORMAT
2061         ") would overflow the fragment, Sending start_new_fragment cmd",
2062         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2063             splitmux->gop_start_time));
2064     cmd = out_cmd_buf_new ();
2065     cmd->start_new_fragment = TRUE;
2066     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2067     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2068 
2069     new_out_ts = splitmux->reference_ctx->in_running_time;
2070     splitmux->fragment_start_time = splitmux->gop_start_time;
2071     splitmux->fragment_total_bytes = 0;
2072 
2073     if (request_next_keyframe (splitmux,
2074             splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
2075       GST_WARNING_OBJECT (splitmux,
2076           "Could not request a keyframe. Files may not split at the exact location they should");
2077     }
2078     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2079   }
2080 
2081   /* And set up to collect the next GOP */
2082   if (!splitmux->reference_ctx->in_eos) {
2083     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2084     splitmux->gop_start_time = new_out_ts;
2085   } else {
2086     /* This is probably already the current state, but just in case: */
2087     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2088     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
2089   }
2090 
2091   /* And wake all input contexts to send a wake-up event */
2092   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2093   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2094 
2095   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2096   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2097 
2098   if (splitmux->gop_total_bytes > 0) {
2099     GST_LOG_OBJECT (splitmux,
2100         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2101         " time %" GST_STIME_FORMAT,
2102         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2103 
2104     /* Send this GOP to the output command queue */
2105     cmd = out_cmd_buf_new ();
2106     cmd->start_new_fragment = FALSE;
2107     cmd->max_output_ts = new_out_ts;
2108     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2109         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2110     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2111 
2112     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2113   }
2114 
2115   splitmux->gop_total_bytes = 0;
2116 }
2117 
2118 /* Called with splitmux lock held */
2119 /* Called from each input pad when it is has all the pieces
2120  * for a GOP or EOS, starting with the reference pad which has set the
2121  * splitmux->max_in_running_time
2122  */
2123 static void
check_completed_gop(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)2124 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2125 {
2126   GList *cur;
2127   GstEvent *event;
2128 
2129   /* On ENDING_FILE, the reference stream sends a command to start a new
2130    * fragment, then releases the GOP for output in the new fragment.
2131    *  If somes streams received no buffer during the last GOP that overran,
2132    * because its next buffer has a timestamp bigger than
2133    * ctx->max_in_running_time, its queue is empty. In that case the only
2134    * way to wakeup the output thread is by injecting an event in the
2135    * queue. This usually happen with subtitle streams.
2136    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2137   if (ctx->need_unblock) {
2138     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2139     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2140         GST_EVENT_TYPE_SERIALIZED,
2141         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2142             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2143 
2144     GST_SPLITMUX_UNLOCK (splitmux);
2145     gst_pad_send_event (ctx->sinkpad, event);
2146     GST_SPLITMUX_LOCK (splitmux);
2147 
2148     ctx->need_unblock = FALSE;
2149     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2150     /* state may have changed while we were unlocked. Loop again if so */
2151     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2152       return;
2153   }
2154 
2155   if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2156     gboolean ready = TRUE;
2157 
2158     /* Iterate each pad, and check that the input running time is at least
2159      * up to the reference running time, and if so handle the collected GOP */
2160     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2161         GST_STIME_FORMAT " ctx %p",
2162         GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2163     for (cur = g_list_first (splitmux->contexts); cur != NULL;
2164         cur = g_list_next (cur)) {
2165       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2166 
2167       GST_LOG_OBJECT (splitmux,
2168           "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2169           " EOS %d", tmpctx, tmpctx->srcpad,
2170           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2171 
2172       if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2173           tmpctx->in_running_time < splitmux->max_in_running_time &&
2174           !tmpctx->in_eos) {
2175         GST_LOG_OBJECT (splitmux,
2176             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2177             tmpctx, tmpctx->srcpad);
2178         ready = FALSE;
2179         break;
2180       }
2181     }
2182     if (ready) {
2183       GST_DEBUG_OBJECT (splitmux,
2184           "Collected GOP is complete. Processing (ctx %p)", ctx);
2185       /* All pads have a complete GOP, release it into the multiqueue */
2186       handle_gathered_gop (splitmux);
2187 
2188       /* The user has requested a split, we can split now that the previous GOP
2189        * has been collected to the correct location */
2190       if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE,
2191               FALSE)) {
2192         g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2193       }
2194     }
2195   }
2196 
2197   /* If upstream reached EOS we are not expecting more data, no need to wait
2198    * here. */
2199   if (ctx->in_eos)
2200     return;
2201 
2202   /* Some pad is not yet ready, or GOP is being pushed
2203    * either way, sleep and wait to get woken */
2204   while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2205       !ctx->flushing &&
2206       (ctx->in_running_time >= splitmux->max_in_running_time) &&
2207       (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2208 
2209     GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2210     GST_SPLITMUX_WAIT_INPUT (splitmux);
2211     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2212   }
2213 }
2214 
2215 static GstPadProbeReturn
handle_mq_input(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)2216 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2217 {
2218   GstSplitMuxSink *splitmux = ctx->splitmux;
2219   GstBuffer *buf;
2220   MqStreamBuf *buf_info = NULL;
2221   GstClockTime ts;
2222   gboolean loop_again;
2223   gboolean keyframe = FALSE;
2224 
2225   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2226 
2227   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2228   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2229     g_warning ("Buffer list handling not implemented");
2230     return GST_PAD_PROBE_DROP;
2231   }
2232   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2233       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2234     GstEvent *event = gst_pad_probe_info_get_event (info);
2235 
2236     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2237 
2238     switch (GST_EVENT_TYPE (event)) {
2239       case GST_EVENT_SEGMENT:
2240         gst_event_copy_segment (event, &ctx->in_segment);
2241         break;
2242       case GST_EVENT_FLUSH_STOP:
2243         GST_SPLITMUX_LOCK (splitmux);
2244         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2245         ctx->in_eos = FALSE;
2246 /* ohos.ext.func.0008
2247  * when GST_EVENT_FLUSH_STOP, ctx->in_running_time is set to none.
2248  * when GST_EVENT_EOS called after GST_EVENT_FLUSH_STOP, codes goes to handle_gathered_gop in
2249  * check_completed_gop. queued_gop_time is always below zero, and it failed.
2250  */
2251 #ifndef OHOS_EXT_FUNC
2252         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2253 #endif
2254         GST_SPLITMUX_UNLOCK (splitmux);
2255         break;
2256       case GST_EVENT_EOS:
2257         GST_SPLITMUX_LOCK (splitmux);
2258         ctx->in_eos = TRUE;
2259 
2260         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2261           goto beach;
2262 
2263         if (ctx->is_reference) {
2264           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2265           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2266           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2267           /* Wake up other input pads to collect this GOP */
2268           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2269           check_completed_gop (splitmux, ctx);
2270         } else if (splitmux->input_state ==
2271             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2272           /* If we are waiting for a GOP to be completed (ie, for aux
2273            * pads to catch up), then this pad is complete, so check
2274            * if the whole GOP is.
2275            */
2276           check_completed_gop (splitmux, ctx);
2277         }
2278         GST_SPLITMUX_UNLOCK (splitmux);
2279         break;
2280       case GST_EVENT_GAP:{
2281         GstClockTime gap_ts;
2282         GstClockTimeDiff rtime;
2283 
2284         gst_event_parse_gap (event, &gap_ts, NULL);
2285         if (gap_ts == GST_CLOCK_TIME_NONE)
2286           break;
2287 
2288         GST_SPLITMUX_LOCK (splitmux);
2289 
2290         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2291           goto beach;
2292         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2293 
2294         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2295             GST_STIME_ARGS (rtime));
2296 
2297         if (ctx->is_reference
2298             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2299           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2300           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2301               GST_STIME_ARGS (splitmux->fragment_start_time));
2302           /* Also take this as the first start time when starting up,
2303            * so that we start counting overflow from the first frame */
2304           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2305             splitmux->max_in_running_time = splitmux->fragment_start_time;
2306         }
2307 
2308         GST_SPLITMUX_UNLOCK (splitmux);
2309         break;
2310       }
2311       default:
2312         break;
2313     }
2314     return GST_PAD_PROBE_PASS;
2315   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2316     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2317       case GST_QUERY_ALLOCATION:
2318         return GST_PAD_PROBE_DROP;
2319       default:
2320         return GST_PAD_PROBE_PASS;
2321     }
2322   }
2323 
2324   buf = gst_pad_probe_info_get_buffer (info);
2325   buf_info = mq_stream_buf_new ();
2326 
2327   if (GST_BUFFER_PTS_IS_VALID (buf))
2328     ts = GST_BUFFER_PTS (buf);
2329   else
2330     ts = GST_BUFFER_DTS (buf);
2331 
2332 #ifdef OHOS_OPT_COMPAT
2333 // ohos.opt.compat.0018
2334 // to avoid when buffer take too many time in src,
2335 // cause pst comes to clock none.
2336   GST_BUFFER_PTS (buf) = ts;
2337   GST_BUFFER_DTS (buf) = ts;
2338 #endif
2339 
2340   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2341 
2342   GST_SPLITMUX_LOCK (splitmux);
2343 
2344   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2345     goto beach;
2346 
2347   /* If this buffer has a timestamp, advance the input timestamp of the
2348    * stream */
2349   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2350     GstClockTimeDiff running_time =
2351         my_segment_to_running_time (&ctx->in_segment, ts);
2352 
2353     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2354         GST_STIME_ARGS (running_time));
2355 
2356     if (GST_CLOCK_STIME_IS_VALID (running_time)
2357         && running_time > ctx->in_running_time)
2358       ctx->in_running_time = running_time;
2359   }
2360 
2361   /* Try to make sure we have a valid running time */
2362   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2363     ctx->in_running_time =
2364         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2365   }
2366 
2367   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2368       GST_STIME_ARGS (ctx->in_running_time));
2369 
2370   buf_info->run_ts = ctx->in_running_time;
2371   buf_info->buf_size = gst_buffer_get_size (buf);
2372   buf_info->duration = GST_BUFFER_DURATION (buf);
2373 
2374   /* initialize fragment_start_time */
2375   if (ctx->is_reference
2376       && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2377     splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2378     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2379         GST_STIME_ARGS (splitmux->fragment_start_time));
2380     gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2381 
2382     /* Also take this as the first start time when starting up,
2383      * so that we start counting overflow from the first frame */
2384     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2385       splitmux->max_in_running_time = splitmux->fragment_start_time;
2386     if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2387       GST_WARNING_OBJECT (splitmux,
2388           "Could not request a keyframe. Files may not split at the exact location they should");
2389     }
2390     gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2391   }
2392 
2393   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2394       " total GOP bytes %" G_GUINT64_FORMAT,
2395       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2396 
2397   loop_again = TRUE;
2398   do {
2399     if (ctx->flushing)
2400       break;
2401 
2402     switch (splitmux->input_state) {
2403       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2404         if (ctx->is_reference) {
2405           /* This is the reference context. If it's a keyframe,
2406            * it marks the start of a new GOP and we should wait in
2407            * check_completed_gop before continuing, but either way
2408            * (keyframe or no, we'll pass this buffer through after
2409            * so set loop_again to FALSE */
2410           loop_again = FALSE;
2411 
2412           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2413             /* Allow other input pads to catch up to here too */
2414             splitmux->max_in_running_time = ctx->in_running_time;
2415             GST_LOG_OBJECT (splitmux,
2416                 "Max in running time now %" GST_TIME_FORMAT,
2417                 GST_TIME_ARGS (splitmux->max_in_running_time));
2418             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2419             break;
2420           }
2421           GST_INFO_OBJECT (pad,
2422               "Have keyframe with running time %" GST_STIME_FORMAT,
2423               GST_STIME_ARGS (ctx->in_running_time));
2424           keyframe = TRUE;
2425           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2426           splitmux->max_in_running_time = ctx->in_running_time;
2427           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2428               GST_TIME_ARGS (splitmux->max_in_running_time));
2429           /* Wake up other input pads to collect this GOP */
2430           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2431           check_completed_gop (splitmux, ctx);
2432           /* Store this new keyframe to remember the start of GOP */
2433           gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2434         } else {
2435           /* Pass this buffer if the reference ctx is far enough ahead */
2436           if (ctx->in_running_time < splitmux->max_in_running_time) {
2437             loop_again = FALSE;
2438             break;
2439           }
2440 
2441           /* We're still waiting for a keyframe on the reference pad, sleep */
2442           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2443           GST_SPLITMUX_WAIT_INPUT (splitmux);
2444           GST_LOG_OBJECT (pad,
2445               "Done sleeping for GOP start input state now %d",
2446               splitmux->input_state);
2447         }
2448         break;
2449       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2450         /* We're collecting a GOP. If this is the reference context,
2451          * we need to check if this is a keyframe that marks the start
2452          * of the next GOP. If it is, it marks the end of the GOP we're
2453          * collecting, so sleep and wait until all the other pads also
2454          * reach that timestamp - at which point, we have an entire GOP
2455          * and either go to ENDING_FILE or release this GOP to the muxer and
2456          * go back to COLLECT_GOP_START. */
2457 
2458         /* If we overran the target timestamp, it might be time to process
2459          * the GOP, otherwise bail out for more data
2460          */
2461         GST_LOG_OBJECT (pad,
2462             "Checking TS %" GST_STIME_FORMAT " against max %"
2463             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2464             GST_STIME_ARGS (splitmux->max_in_running_time));
2465 
2466         if (ctx->in_running_time < splitmux->max_in_running_time) {
2467           loop_again = FALSE;
2468           break;
2469         }
2470 
2471         GST_LOG_OBJECT (pad,
2472             "Collected last packet of GOP. Checking other pads");
2473         check_completed_gop (splitmux, ctx);
2474         break;
2475       }
2476       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2477         loop_again = FALSE;
2478         break;
2479       default:
2480         loop_again = FALSE;
2481         break;
2482     }
2483   }
2484   while (loop_again);
2485 
2486   if (keyframe) {
2487     splitmux->queued_keyframes++;
2488     buf_info->keyframe = TRUE;
2489   }
2490 
2491   /* Update total input byte counter for overflow detect */
2492   splitmux->gop_total_bytes += buf_info->buf_size;
2493 
2494   /* Now add this buffer to the queue just before returning */
2495   g_queue_push_head (&ctx->queued_bufs, buf_info);
2496 
2497   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2498       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2499 
2500   GST_SPLITMUX_UNLOCK (splitmux);
2501   return GST_PAD_PROBE_PASS;
2502 
2503 beach:
2504   GST_SPLITMUX_UNLOCK (splitmux);
2505   if (buf_info)
2506     mq_stream_buf_free (buf_info);
2507   return GST_PAD_PROBE_PASS;
2508 }
2509 
2510 static void
grow_blocked_queues(GstSplitMuxSink * splitmux)2511 grow_blocked_queues (GstSplitMuxSink * splitmux)
2512 {
2513   GList *cur;
2514 
2515   /* Scan other queues for full-ness and grow them */
2516   for (cur = g_list_first (splitmux->contexts);
2517       cur != NULL; cur = g_list_next (cur)) {
2518     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2519     guint cur_limit;
2520     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2521 
2522     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2523     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2524 
2525     if (cur_len >= cur_limit) {
2526       cur_limit = cur_len + 1;
2527       GST_DEBUG_OBJECT (tmpctx->q,
2528           "Queue overflowed and needs enlarging. Growing to %u buffers",
2529           cur_limit);
2530       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2531     }
2532   }
2533 }
2534 
2535 static void
handle_q_underrun(GstElement * q,gpointer user_data)2536 handle_q_underrun (GstElement * q, gpointer user_data)
2537 {
2538   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2539   GstSplitMuxSink *splitmux = ctx->splitmux;
2540 
2541   GST_SPLITMUX_LOCK (splitmux);
2542   GST_DEBUG_OBJECT (q,
2543       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2544       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2545   grow_blocked_queues (splitmux);
2546   GST_SPLITMUX_UNLOCK (splitmux);
2547 }
2548 
2549 static void
handle_q_overrun(GstElement * q,gpointer user_data)2550 handle_q_overrun (GstElement * q, gpointer user_data)
2551 {
2552   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2553   GstSplitMuxSink *splitmux = ctx->splitmux;
2554   gboolean allow_grow = FALSE;
2555 
2556   GST_SPLITMUX_LOCK (splitmux);
2557   GST_DEBUG_OBJECT (q,
2558       "Queue reported overrun with %d keyframes and %d cmds enqueued",
2559       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2560 
2561   if (splitmux->queued_keyframes < 2) {
2562     /* Less than a full GOP queued, grow the queue */
2563     allow_grow = TRUE;
2564   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2565     allow_grow = TRUE;
2566   } else {
2567     /* If another queue is starved, grow */
2568     GList *cur;
2569     for (cur = g_list_first (splitmux->contexts);
2570         cur != NULL; cur = g_list_next (cur)) {
2571       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2572       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2573         allow_grow = TRUE;
2574       }
2575     }
2576   }
2577   GST_SPLITMUX_UNLOCK (splitmux);
2578 
2579   if (allow_grow) {
2580     guint cur_limit;
2581 
2582     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2583     cur_limit++;
2584 
2585     GST_DEBUG_OBJECT (q,
2586         "Queue overflowed and needs enlarging. Growing to %u buffers",
2587         cur_limit);
2588 
2589     g_object_set (q, "max-size-buffers", cur_limit, NULL);
2590   }
2591 }
2592 
2593 static GstPad *
gst_splitmux_sink_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)2594 gst_splitmux_sink_request_new_pad (GstElement * element,
2595     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2596 {
2597   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2598   GstPadTemplate *mux_template = NULL;
2599   GstPad *res = NULL;
2600   GstElement *q;
2601   GstPad *q_sink = NULL, *q_src = NULL;
2602   gchar *gname;
2603   gboolean is_video = FALSE;
2604   MqStreamCtx *ctx;
2605 
2606   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2607 
2608   GST_SPLITMUX_LOCK (splitmux);
2609   if (!create_muxer (splitmux))
2610     goto fail;
2611   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2612 
2613   if (templ->name_template) {
2614     if (g_str_equal (templ->name_template, "video")) {
2615       if (splitmux->have_video)
2616         goto already_have_video;
2617 
2618       /* FIXME: Look for a pad template with matching caps, rather than by name */
2619       GST_DEBUG_OBJECT (element,
2620           "searching for pad-template with name 'video_%%u'");
2621       mux_template =
2622           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2623           (splitmux->muxer), "video_%u");
2624 
2625       /* Fallback to find sink pad templates named 'video' (flvmux) */
2626       if (!mux_template) {
2627         GST_DEBUG_OBJECT (element,
2628             "searching for pad-template with name 'video'");
2629         mux_template =
2630             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2631             (splitmux->muxer), "video");
2632       }
2633       is_video = TRUE;
2634       name = NULL;
2635     } else {
2636       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
2637           templ->name_template);
2638       mux_template =
2639           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2640           (splitmux->muxer), templ->name_template);
2641 
2642       /* Fallback to find sink pad templates named 'audio' (flvmux) */
2643       if (!mux_template) {
2644         GST_DEBUG_OBJECT (element,
2645             "searching for pad-template with name 'audio'");
2646         mux_template =
2647             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2648             (splitmux->muxer), "audio");
2649         name = NULL;
2650       }
2651     }
2652 
2653     if (mux_template == NULL) {
2654       GST_DEBUG_OBJECT (element,
2655           "searching for pad-template with name 'sink_%%d'");
2656       mux_template =
2657           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2658           (splitmux->muxer), "sink_%d");
2659       name = NULL;
2660     }
2661     if (mux_template == NULL) {
2662       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
2663       mux_template =
2664           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2665           (splitmux->muxer), "sink");
2666       name = NULL;
2667     }
2668   }
2669 
2670   if (mux_template == NULL) {
2671     GST_ERROR_OBJECT (element,
2672         "unable to find a suitable sink pad-template on the muxer");
2673 
2674     goto fail;
2675   }
2676   GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
2677       mux_template->name_template);
2678 
2679   if (mux_template->presence == GST_PAD_REQUEST) {
2680     GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
2681 
2682     res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2683     if (res == NULL)
2684       goto fail;
2685   } else if (mux_template->presence == GST_PAD_ALWAYS) {
2686     GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
2687 
2688     res =
2689         gst_element_get_static_pad (splitmux->muxer,
2690         mux_template->name_template);
2691     if (res == NULL)
2692       goto fail;
2693   } else {
2694     GST_ERROR_OBJECT (element,
2695         "unexpected pad presence %d", mux_template->presence);
2696 
2697     goto fail;
2698   }
2699 
2700   if (is_video)
2701     gname = g_strdup ("video");
2702   else if (name == NULL)
2703     gname = gst_pad_get_name (res);
2704   else
2705     gname = g_strdup (name);
2706 
2707   if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2708     goto fail;
2709 
2710   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2711 
2712   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2713       "max-size-buffers", 5, NULL);
2714 
2715   q_sink = gst_element_get_static_pad (q, "sink");
2716   q_src = gst_element_get_static_pad (q, "src");
2717 
2718   if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2719     gst_element_release_request_pad (splitmux->muxer, res);
2720     gst_object_unref (GST_OBJECT (res));
2721     goto fail;
2722   }
2723 
2724   gst_object_unref (GST_OBJECT (res));
2725 
2726   ctx = mq_stream_ctx_new (splitmux);
2727   /* Context holds a ref: */
2728   ctx->q = gst_object_ref (q);
2729   ctx->srcpad = q_src;
2730   ctx->sinkpad = q_sink;
2731   ctx->q_overrun_id =
2732       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2733   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2734 
2735   ctx->src_pad_block_id =
2736       gst_pad_add_probe (q_src,
2737       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2738       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2739   if (is_video && splitmux->reference_ctx != NULL) {
2740     splitmux->reference_ctx->is_reference = FALSE;
2741     splitmux->reference_ctx = NULL;
2742   }
2743   if (splitmux->reference_ctx == NULL) {
2744     splitmux->reference_ctx = ctx;
2745     ctx->is_reference = TRUE;
2746   }
2747 
2748   res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2749   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2750 
2751   ctx->sink_pad_block_id =
2752       gst_pad_add_probe (q_sink,
2753       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2754       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2755       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2756 
2757   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2758       " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2759 
2760   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2761 
2762   g_free (gname);
2763 
2764   if (is_video)
2765     splitmux->have_video = TRUE;
2766 
2767   gst_pad_set_active (res, TRUE);
2768   gst_element_add_pad (element, res);
2769 
2770   GST_SPLITMUX_UNLOCK (splitmux);
2771 
2772   return res;
2773 fail:
2774   GST_SPLITMUX_UNLOCK (splitmux);
2775 
2776   if (q_sink)
2777     gst_object_unref (q_sink);
2778   if (q_src)
2779     gst_object_unref (q_src);
2780   return NULL;
2781 already_have_video:
2782   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2783   GST_SPLITMUX_UNLOCK (splitmux);
2784   return NULL;
2785 }
2786 
2787 static void
gst_splitmux_sink_release_pad(GstElement * element,GstPad * pad)2788 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2789 {
2790   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2791   GstPad *muxpad = NULL;
2792   MqStreamCtx *ctx =
2793       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2794 
2795   GST_SPLITMUX_LOCK (splitmux);
2796 
2797   if (splitmux->muxer == NULL)
2798     goto fail;                  /* Elements don't exist yet - nothing to release */
2799 
2800   GST_INFO_OBJECT (pad, "releasing request pad");
2801 
2802   muxpad = gst_pad_get_peer (ctx->srcpad);
2803 
2804   /* Remove the context from our consideration */
2805   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2806 
2807   if (ctx->sink_pad_block_id)
2808     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2809 
2810   if (ctx->src_pad_block_id)
2811     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2812 
2813   /* Can release the context now */
2814   mq_stream_ctx_free (ctx);
2815   if (ctx == splitmux->reference_ctx)
2816     splitmux->reference_ctx = NULL;
2817 
2818   /* Release and free the muxer input */
2819   if (muxpad) {
2820     gst_element_release_request_pad (splitmux->muxer, muxpad);
2821     gst_object_unref (muxpad);
2822   }
2823 
2824   if (GST_PAD_PAD_TEMPLATE (pad) &&
2825       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2826               (pad)), "video"))
2827     splitmux->have_video = FALSE;
2828 
2829   gst_element_remove_pad (element, pad);
2830 
2831   /* Reset the internal elements only after all request pads are released */
2832   if (splitmux->contexts == NULL)
2833     gst_splitmux_reset (splitmux);
2834 
2835 fail:
2836   GST_SPLITMUX_UNLOCK (splitmux);
2837 }
2838 
2839 static GstElement *
create_element(GstSplitMuxSink * splitmux,const gchar * factory,const gchar * name,gboolean locked)2840 create_element (GstSplitMuxSink * splitmux,
2841     const gchar * factory, const gchar * name, gboolean locked)
2842 {
2843   GstElement *ret = gst_element_factory_make (factory, name);
2844   if (ret == NULL) {
2845     g_warning ("Failed to create %s - splitmuxsink will not work", name);
2846     return NULL;
2847   }
2848 
2849   if (locked) {
2850     /* Ensure the sink starts in locked state and NULL - it will be changed
2851      * by the filename setting code */
2852     gst_element_set_locked_state (ret, TRUE);
2853     gst_element_set_state (ret, GST_STATE_NULL);
2854   }
2855 
2856   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2857     g_warning ("Could not add %s element - splitmuxsink will not work", name);
2858     gst_object_unref (ret);
2859     return NULL;
2860   }
2861 
2862   return ret;
2863 }
2864 
2865 static gboolean
create_muxer(GstSplitMuxSink * splitmux)2866 create_muxer (GstSplitMuxSink * splitmux)
2867 {
2868   /* Create internal elements */
2869   if (splitmux->muxer == NULL) {
2870     GstElement *provided_muxer = NULL;
2871 
2872     GST_OBJECT_LOCK (splitmux);
2873     if (splitmux->provided_muxer != NULL)
2874       provided_muxer = gst_object_ref (splitmux->provided_muxer);
2875     GST_OBJECT_UNLOCK (splitmux);
2876 
2877     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2878         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2879       if ((splitmux->muxer =
2880               create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2881         goto fail;
2882     } else if (splitmux->async_finalize) {
2883       if ((splitmux->muxer =
2884               create_element (splitmux, splitmux->muxer_factory, "muxer",
2885                   FALSE)) == NULL)
2886         goto fail;
2887       if (splitmux->muxer_properties)
2888         gst_structure_foreach (splitmux->muxer_properties,
2889             _set_property_from_structure, splitmux->muxer);
2890     } else {
2891       /* Ensure it's not in locked state (we might be reusing an old element) */
2892       gst_element_set_locked_state (provided_muxer, FALSE);
2893       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2894         g_warning ("Could not add muxer element - splitmuxsink will not work");
2895         gst_object_unref (provided_muxer);
2896         goto fail;
2897       }
2898 
2899       splitmux->muxer = provided_muxer;
2900       gst_object_unref (provided_muxer);
2901     }
2902 
2903     if (splitmux->use_robust_muxing) {
2904       update_muxer_properties (splitmux);
2905     }
2906   }
2907 
2908   return TRUE;
2909 fail:
2910   return FALSE;
2911 }
2912 
2913 static GstElement *
find_sink(GstElement * e)2914 find_sink (GstElement * e)
2915 {
2916   GstElement *res = NULL;
2917   GstIterator *iter;
2918   gboolean done = FALSE;
2919   GValue data = { 0, };
2920 
2921   if (!GST_IS_BIN (e))
2922     return e;
2923 
2924   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2925     return e;
2926 
2927   iter = gst_bin_iterate_sinks (GST_BIN (e));
2928   while (!done) {
2929     switch (gst_iterator_next (iter, &data)) {
2930       case GST_ITERATOR_OK:
2931       {
2932         GstElement *child = g_value_get_object (&data);
2933         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2934                 "location") != NULL) {
2935           res = child;
2936           done = TRUE;
2937         }
2938         g_value_reset (&data);
2939         break;
2940       }
2941       case GST_ITERATOR_RESYNC:
2942         gst_iterator_resync (iter);
2943         break;
2944       case GST_ITERATOR_DONE:
2945         done = TRUE;
2946         break;
2947       case GST_ITERATOR_ERROR:
2948         g_assert_not_reached ();
2949         break;
2950     }
2951   }
2952   g_value_unset (&data);
2953   gst_iterator_free (iter);
2954 
2955   return res;
2956 }
2957 
2958 static gboolean
create_sink(GstSplitMuxSink * splitmux)2959 create_sink (GstSplitMuxSink * splitmux)
2960 {
2961   GstElement *provided_sink = NULL;
2962 
2963   if (splitmux->active_sink == NULL) {
2964 
2965     GST_OBJECT_LOCK (splitmux);
2966     if (splitmux->provided_sink != NULL)
2967       provided_sink = gst_object_ref (splitmux->provided_sink);
2968     GST_OBJECT_UNLOCK (splitmux);
2969 
2970     if ((!splitmux->async_finalize && provided_sink == NULL) ||
2971         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2972       if ((splitmux->sink =
2973               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2974         goto fail;
2975       splitmux->active_sink = splitmux->sink;
2976     } else if (splitmux->async_finalize) {
2977       if ((splitmux->sink =
2978               create_element (splitmux, splitmux->sink_factory, "sink",
2979                   TRUE)) == NULL)
2980         goto fail;
2981       if (splitmux->sink_properties)
2982         gst_structure_foreach (splitmux->sink_properties,
2983             _set_property_from_structure, splitmux->sink);
2984       splitmux->active_sink = splitmux->sink;
2985     } else {
2986       /* Ensure the sink starts in locked state and NULL - it will be changed
2987        * by the filename setting code */
2988       gst_element_set_locked_state (provided_sink, TRUE);
2989       gst_element_set_state (provided_sink, GST_STATE_NULL);
2990       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2991         g_warning ("Could not add sink elements - splitmuxsink will not work");
2992         gst_object_unref (provided_sink);
2993         goto fail;
2994       }
2995 
2996       splitmux->active_sink = provided_sink;
2997 
2998       /* The bin holds a ref now, we can drop our tmp ref */
2999       gst_object_unref (provided_sink);
3000 
3001       /* Find the sink element */
3002       splitmux->sink = find_sink (splitmux->active_sink);
3003       if (splitmux->sink == NULL) {
3004         g_warning
3005             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3006         goto fail;
3007       }
3008     }
3009 
3010 #if 1
3011     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3012             "async") != NULL) {
3013       /* async child elements are causing state change races and weird
3014        * failures, so let's try and turn that off */
3015       g_object_set (splitmux->sink, "async", FALSE, NULL);
3016     }
3017 #endif
3018 
3019     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3020       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3021       goto fail;
3022     }
3023   }
3024 
3025   return TRUE;
3026 fail:
3027   return FALSE;
3028 }
3029 
3030 #ifdef __GNUC__
3031 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3032 #endif
3033 static void
set_next_filename(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)3034 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3035 {
3036   gchar *fname = NULL;
3037   GstSample *sample;
3038   GstCaps *caps;
3039 
3040   gst_splitmux_sink_ensure_max_files (splitmux);
3041 
3042   if (ctx->cur_out_buffer == NULL) {
3043     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3044   }
3045 
3046   caps = gst_pad_get_current_caps (ctx->srcpad);
3047   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3048   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3049       splitmux->fragment_id, sample, &fname);
3050   gst_sample_unref (sample);
3051   if (caps)
3052     gst_caps_unref (caps);
3053 
3054   if (fname == NULL) {
3055     /* Fallback to the old signal if the new one returned nothing */
3056     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3057         splitmux->fragment_id, &fname);
3058   }
3059 
3060   if (!fname)
3061     fname = splitmux->location ?
3062         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3063 
3064   if (fname) {
3065     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3066     g_object_set (splitmux->sink, "location", fname, NULL);
3067     g_free (fname);
3068 
3069     splitmux->fragment_id++;
3070   }
3071 }
3072 
3073 static void
do_async_start(GstSplitMuxSink * splitmux)3074 do_async_start (GstSplitMuxSink * splitmux)
3075 {
3076   GstMessage *message;
3077 
3078   if (!splitmux->need_async_start) {
3079     GST_INFO_OBJECT (splitmux, "no async_start needed");
3080     return;
3081   }
3082 
3083   splitmux->async_pending = TRUE;
3084 
3085   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3086   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3087   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3088       (splitmux), message);
3089 }
3090 
3091 static void
do_async_done(GstSplitMuxSink * splitmux)3092 do_async_done (GstSplitMuxSink * splitmux)
3093 {
3094   GstMessage *message;
3095 
3096   if (splitmux->async_pending) {
3097     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3098     message =
3099         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3100         GST_CLOCK_TIME_NONE);
3101     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3102         (splitmux), message);
3103 
3104     splitmux->async_pending = FALSE;
3105   }
3106 
3107   splitmux->need_async_start = FALSE;
3108 }
3109 
3110 static GstStateChangeReturn
gst_splitmux_sink_change_state(GstElement * element,GstStateChange transition)3111 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3112 {
3113   GstStateChangeReturn ret;
3114   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3115 
3116   switch (transition) {
3117     case GST_STATE_CHANGE_NULL_TO_READY:{
3118       GST_SPLITMUX_LOCK (splitmux);
3119       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3120         ret = GST_STATE_CHANGE_FAILURE;
3121         GST_SPLITMUX_UNLOCK (splitmux);
3122         goto beach;
3123       }
3124       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3125       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3126       GST_SPLITMUX_UNLOCK (splitmux);
3127       splitmux->fragment_id = 0;
3128       break;
3129     }
3130     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3131       GST_SPLITMUX_LOCK (splitmux);
3132       /* Start by collecting one input on each pad */
3133       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3134       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3135       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3136       splitmux->gop_start_time = splitmux->fragment_start_time =
3137           GST_CLOCK_STIME_NONE;
3138       splitmux->muxed_out_bytes = 0;
3139       splitmux->ready_for_output = FALSE;
3140       GST_SPLITMUX_UNLOCK (splitmux);
3141       break;
3142     }
3143     case GST_STATE_CHANGE_PAUSED_TO_READY:
3144       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3145       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3146     case GST_STATE_CHANGE_READY_TO_NULL:
3147       GST_SPLITMUX_LOCK (splitmux);
3148       gst_queue_array_clear (splitmux->times_to_split);
3149       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3150       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3151       /* Wake up any blocked threads */
3152       GST_LOG_OBJECT (splitmux,
3153           "State change -> NULL or READY. Waking threads");
3154       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3155       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3156       GST_SPLITMUX_UNLOCK (splitmux);
3157       break;
3158     default:
3159       break;
3160   }
3161 
3162   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3163   if (ret == GST_STATE_CHANGE_FAILURE)
3164     goto beach;
3165 
3166   switch (transition) {
3167     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3168       splitmux->need_async_start = TRUE;
3169       break;
3170     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3171       /* Change state async, because our child sink might not
3172        * be ready to do that for us yet if it's state is still locked */
3173 
3174       splitmux->need_async_start = TRUE;
3175       /* we want to go async to PAUSED until we managed to configure and add the
3176        * sink */
3177       GST_SPLITMUX_LOCK (splitmux);
3178       do_async_start (splitmux);
3179       GST_SPLITMUX_UNLOCK (splitmux);
3180       ret = GST_STATE_CHANGE_ASYNC;
3181       break;
3182     }
3183     case GST_STATE_CHANGE_READY_TO_NULL:
3184       GST_SPLITMUX_LOCK (splitmux);
3185       splitmux->fragment_id = 0;
3186       /* Reset internal elements only if no pad contexts are using them */
3187       if (splitmux->contexts == NULL)
3188         gst_splitmux_reset (splitmux);
3189       do_async_done (splitmux);
3190       GST_SPLITMUX_UNLOCK (splitmux);
3191       break;
3192     default:
3193       break;
3194   }
3195 
3196 beach:
3197   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3198       ret == GST_STATE_CHANGE_FAILURE) {
3199     /* Cleanup elements on failed transition out of NULL */
3200     gst_splitmux_reset (splitmux);
3201     GST_SPLITMUX_LOCK (splitmux);
3202     do_async_done (splitmux);
3203     GST_SPLITMUX_UNLOCK (splitmux);
3204   }
3205   return ret;
3206 }
3207 
3208 gboolean
register_splitmuxsink(GstPlugin * plugin)3209 register_splitmuxsink (GstPlugin * plugin)
3210 {
3211   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3212       "Split File Muxing Sink");
3213 
3214   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3215       GST_TYPE_SPLITMUX_SINK);
3216 }
3217 
3218 static void
gst_splitmux_sink_ensure_max_files(GstSplitMuxSink * splitmux)3219 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3220 {
3221   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3222     splitmux->fragment_id = 0;
3223   }
3224 }
3225 
3226 static void
split_now(GstSplitMuxSink * splitmux)3227 split_now (GstSplitMuxSink * splitmux)
3228 {
3229   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3230 }
3231 
3232 static void
split_after(GstSplitMuxSink * splitmux)3233 split_after (GstSplitMuxSink * splitmux)
3234 {
3235   g_atomic_int_set (&(splitmux->split_requested), TRUE);
3236 }
3237 
3238 static void
split_at_running_time(GstSplitMuxSink * splitmux,GstClockTime split_time)3239 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3240 {
3241   gboolean send_keyframe_requests;
3242 
3243   GST_SPLITMUX_LOCK (splitmux);
3244   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3245   send_keyframe_requests = splitmux->send_keyframe_requests;
3246   GST_SPLITMUX_UNLOCK (splitmux);
3247 
3248   if (send_keyframe_requests) {
3249     GstEvent *ev =
3250         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3251     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3252         GST_TIME_ARGS (split_time));
3253     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3254       GST_WARNING_OBJECT (splitmux,
3255           "Could not request keyframe at %" GST_TIME_FORMAT,
3256           GST_TIME_ARGS (split_time));
3257     }
3258   }
3259 }
3260