1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 /**
21 * SECTION:element-splitmuxsink
22 * @title: splitmuxsink
23 * @short_description: Muxer wrapper for splitting output stream by size or time
24 *
25 * This element wraps a muxer and a sink, and starts a new file when the mux
26 * contents are about to cross a threshold of maximum size of maximum time,
27 * splitting at video keyframe boundaries. Exactly one input video stream
28 * can be muxed, with as many accompanying audio and subtitle streams as
29 * desired.
30 *
31 * By default, it uses mp4mux and filesink, but they can be changed via
32 * the 'muxer' and 'sink' properties.
33 *
34 * The minimum file size is 1 GOP, however - so limits may be overrun if the
35 * distance between any 2 keyframes is larger than the limits.
36 *
37 * If a video stream is available, the splitting process is driven by the video
38 * stream contents, and the video stream must contain closed GOPs for the output
39 * file parts to be played individually correctly. In the absence of a video
40 * stream, the first available stream is used as reference for synchronization.
41 *
42 * In the async-finalize mode, when the threshold is crossed, the old muxer
43 * and sink is disconnected from the pipeline and left to finish the file
44 * asynchronously, and a new muxer and sink is created to continue with the
45 * next fragment. For that reason, instead of muxer and sink objects, the
46 * muxer-factory and sink-factory properties are used to construct the new
47 * objects, together with muxer-properties and sink-properties.
48 *
49 * ## Example pipelines
50 * |[
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52 * ]|
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
56 *
57 * |[
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59 * ]|
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
63 *
64 * |[
65 * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66 * ]|
67 * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68 * it will deliver to.
69 */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97 GstClockTime split_time);
98
99 enum
100 {
101 PROP_0,
102 PROP_LOCATION,
103 PROP_START_INDEX,
104 PROP_MAX_SIZE_TIME,
105 PROP_MAX_SIZE_BYTES,
106 PROP_MAX_SIZE_TIMECODE,
107 PROP_SEND_KEYFRAME_REQUESTS,
108 PROP_MAX_FILES,
109 PROP_MUXER_OVERHEAD,
110 PROP_USE_ROBUST_MUXING,
111 PROP_ALIGNMENT_THRESHOLD,
112 PROP_MUXER,
113 PROP_SINK,
114 PROP_RESET_MUXER,
115 PROP_ASYNC_FINALIZE,
116 PROP_MUXER_FACTORY,
117 PROP_MUXER_PRESET,
118 PROP_MUXER_PROPERTIES,
119 PROP_SINK_FACTORY,
120 PROP_SINK_PRESET,
121 PROP_SINK_PROPERTIES,
122 PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME 0
126 #define DEFAULT_MAX_SIZE_BYTES 0
127 #define DEFAULT_MAX_FILES 0
128 #define DEFAULT_MUXER_OVERHEAD 0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140 MqStreamCtx *ctx;
141 GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146 SIGNAL_FORMAT_LOCATION,
147 SIGNAL_FORMAT_LOCATION_FULL,
148 SIGNAL_SPLIT_NOW,
149 SIGNAL_SPLIT_AFTER,
150 SIGNAL_SPLIT_AT_RUNNING_TIME,
151 SIGNAL_MUXER_ADDED,
152 SIGNAL_SINK_ADDED,
153 SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160 GST_PAD_SINK,
161 GST_PAD_REQUEST,
162 GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165 GST_PAD_SINK,
166 GST_PAD_REQUEST,
167 GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170 GST_PAD_SINK,
171 GST_PAD_REQUEST,
172 GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175 GST_PAD_SINK,
176 GST_PAD_REQUEST,
177 GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180 GST_PAD_SINK,
181 GST_PAD_REQUEST,
182 GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188 * to forward an incoming EOS message, but we cannot rely on the state of the
189 * splitmux anymore, so we set this qdata on the sink instead.
190 * The muxer and sink must be destroyed after both of these things have
191 * finished:
192 * 1) The EOS message has been sent when the fragment is ending
193 * 2) The muxer has been unlinked and relinked
194 * Therefore, EOS_FROM_US can have these two values:
195 * 0: EOS was not requested from us. Forward the message. The muxer and the
196 * sink will be destroyed together with the rest of the bin.
197 * 1: EOS was requested from us, but the other of the two tasks hasn't
198 * finished. Set EOS_FROM_US to 2 and do your stuff.
199 * 2: EOS was requested from us and the other of the two tasks has finished.
200 * Now we can destroy the muxer and the sink.
201 */
202
203 static void
_do_init(void)204 _do_init (void)
205 {
206 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208 RUNNING_TIME = g_quark_from_static_string ("running-time");
209 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210 "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215 _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217 GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222 const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224 GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233 element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238 MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244 const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250 GstVideoTimeCode ** next_tc);
251
252 static MqStreamBuf *
mq_stream_buf_new(void)253 mq_stream_buf_new (void)
254 {
255 return g_slice_new0 (MqStreamBuf);
256 }
257
258 static void
mq_stream_buf_free(MqStreamBuf * data)259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261 g_slice_free (MqStreamBuf, data);
262 }
263
264 static SplitMuxOutputCommand *
out_cmd_buf_new(void)265 out_cmd_buf_new (void)
266 {
267 return g_slice_new0 (SplitMuxOutputCommand);
268 }
269
270 static void
out_cmd_buf_free(SplitMuxOutputCommand * data)271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273 g_slice_free (SplitMuxOutputCommand, data);
274 }
275
276 static void
input_gop_free(InputGop * gop)277 input_gop_free (InputGop * gop)
278 {
279 g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280 g_slice_free (InputGop, gop);
281 }
282
283 static void
gst_splitmux_sink_class_init(GstSplitMuxSinkClass * klass)284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286 GObjectClass *gobject_class = (GObjectClass *) klass;
287 GstElementClass *gstelement_class = (GstElementClass *) klass;
288 GstBinClass *gstbin_class = (GstBinClass *) klass;
289
290 gobject_class->set_property = gst_splitmux_sink_set_property;
291 gobject_class->get_property = gst_splitmux_sink_get_property;
292 gobject_class->dispose = gst_splitmux_sink_dispose;
293 gobject_class->finalize = gst_splitmux_sink_finalize;
294
295 gst_element_class_set_static_metadata (gstelement_class,
296 "Split Muxing Bin", "Generic/Bin/Muxer",
297 "Convenience bin that muxes incoming streams into multiple time/size limited files",
298 "Jan Schmidt <jan@centricular.com>");
299
300 gst_element_class_add_static_pad_template (gstelement_class,
301 &video_sink_template);
302 gst_element_class_add_static_pad_template (gstelement_class,
303 &video_aux_sink_template);
304 gst_element_class_add_static_pad_template (gstelement_class,
305 &audio_sink_template);
306 gst_element_class_add_static_pad_template (gstelement_class,
307 &subtitle_sink_template);
308 gst_element_class_add_static_pad_template (gstelement_class,
309 &caption_sink_template);
310
311 gstelement_class->change_state =
312 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313 gstelement_class->request_new_pad =
314 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315 gstelement_class->release_pad =
316 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317
318 gstbin_class->handle_message = bus_handler;
319
320 g_object_class_install_property (gobject_class, PROP_LOCATION,
321 g_param_spec_string ("location", "File Output Pattern",
322 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325 g_param_spec_double ("mux-overhead", "Muxing Overhead",
326 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327 DEFAULT_MUXER_OVERHEAD,
328 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329
330 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333 DEFAULT_MAX_SIZE_TIME,
334 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335 G_PARAM_STATIC_STRINGS));
336 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339 DEFAULT_MAX_SIZE_BYTES,
340 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341 G_PARAM_STATIC_STRINGS));
342 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344 "Maximum difference in timecode between first and last frame. "
345 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346 "Will only be effective if a timecode track is present.", NULL,
347 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348 G_PARAM_STATIC_STRINGS));
349 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350 g_param_spec_boolean ("send-keyframe-requests",
351 "Request keyframes at max-size-time",
352 "Request a keyframe every max-size-time ns to try splitting at that point. "
353 "Needs max-size-bytes to be 0 in order to be effective.",
354 DEFAULT_SEND_KEYFRAME_REQUESTS,
355 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356 G_PARAM_STATIC_STRINGS));
357 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358 g_param_spec_uint ("max-files", "Max files",
359 "Maximum number of files to keep on disk. Once the maximum is reached,"
360 "old files start to be deleted to make room for new ones.", 0,
361 G_MAXUINT, DEFAULT_MAX_FILES,
362 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365 "Allow non-reference streams to be that many ns before the reference"
366 " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368 G_PARAM_STATIC_STRINGS));
369
370 g_object_class_install_property (gobject_class, PROP_MUXER,
371 g_param_spec_object ("muxer", "Muxer",
372 "The muxer element to use (NULL = default mp4mux). "
373 "Valid only for async-finalize = FALSE",
374 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375 g_object_class_install_property (gobject_class, PROP_SINK,
376 g_param_spec_object ("sink", "Sink",
377 "The sink element (or element chain) to use (NULL = default filesink). "
378 "Valid only for async-finalize = FALSE",
379 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382 g_param_spec_boolean ("use-robust-muxing",
383 "Support robust-muxing mode of some muxers",
384 "Check if muxers support robust muxing via the reserved-max-duration and "
385 "reserved-duration-remaining properties and use them if so. "
386 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387 " create new fragments if the reserved header space is about to overflow. "
388 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389 "manually by the app to a non-zero value for robust muxing to have an effect.",
390 DEFAULT_USE_ROBUST_MUXING,
391 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394 g_param_spec_boolean ("reset-muxer",
395 "Reset Muxer",
396 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400 g_param_spec_boolean ("async-finalize",
401 "Finalize fragments asynchronously",
402 "Finalize each fragment asynchronously and start a new one",
403 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405 g_param_spec_string ("muxer-factory", "Muxer factory",
406 "The muxer element factory to use (default = mp4mux). "
407 "Valid only for async-finalize = TRUE",
408 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409 /**
410 * GstSplitMuxSink:muxer-preset
411 *
412 * An optional #GstPreset name to use for the muxer. This only has an effect
413 * in `async-finalize=TRUE` mode.
414 *
415 * Since: 1.18
416 */
417 g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418 g_param_spec_string ("muxer-preset", "Muxer preset",
419 "The muxer preset to use. "
420 "Valid only for async-finalize = TRUE",
421 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423 g_param_spec_boxed ("muxer-properties", "Muxer properties",
424 "The muxer element properties to use. "
425 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426 "Valid only for async-finalize = TRUE",
427 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429 g_param_spec_string ("sink-factory", "Sink factory",
430 "The sink element factory to use (default = filesink). "
431 "Valid only for async-finalize = TRUE",
432 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433 /**
434 * GstSplitMuxSink:sink-preset
435 *
436 * An optional #GstPreset name to use for the sink. This only has an effect
437 * in `async-finalize=TRUE` mode.
438 *
439 * Since: 1.18
440 */
441 g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442 g_param_spec_string ("sink-preset", "Sink preset",
443 "The sink preset to use. "
444 "Valid only for async-finalize = TRUE",
445 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447 g_param_spec_boxed ("sink-properties", "Sink properties",
448 "The sink element properties to use. "
449 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450 "Valid only for async-finalize = TRUE",
451 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452 g_object_class_install_property (gobject_class, PROP_START_INDEX,
453 g_param_spec_int ("start-index", "Start Index",
454 "Start value of fragment index.",
455 0, G_MAXINT, DEFAULT_START_INDEX,
456 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458 /**
459 * GstSplitMuxSink::muxer-pad-map
460 *
461 * An optional GstStructure that provides a map from splitmuxsink sinkpad
462 * names to muxer pad names they should feed. Splitmuxsink has some default
463 * mapping behaviour to link video to video pads and audio to audio pads
464 * that usually works fine. This property is useful if you need to ensure
465 * a particular mapping to muxed streams.
466 *
467 * The GstStructure contains string fields like so:
468 * splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469 *
470 * Since: 1.18
471 */
472 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473 g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474 "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475 GST_TYPE_STRUCTURE,
476 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477
478 /**
479 * GstSplitMuxSink::format-location:
480 * @splitmux: the #GstSplitMuxSink
481 * @fragment_id: the sequence number of the file to be created
482 *
483 * Returns: the location to be used for the next output file. This must be
484 * a newly-allocated string which will be freed with g_free() by the
485 * splitmuxsink element when it no longer needs it, so use g_strdup() or
486 * g_strdup_printf() or similar functions to allocate it.
487 */
488 signals[SIGNAL_FORMAT_LOCATION] =
489 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491
492 /**
493 * GstSplitMuxSink::format-location-full:
494 * @splitmux: the #GstSplitMuxSink
495 * @fragment_id: the sequence number of the file to be created
496 * @first_sample: A #GstSample containing the first buffer
497 * from the reference stream in the new file
498 *
499 * Returns: the location to be used for the next output file. This must be
500 * a newly-allocated string which will be freed with g_free() by the
501 * splitmuxsink element when it no longer needs it, so use g_strdup() or
502 * g_strdup_printf() or similar functions to allocate it.
503 *
504 * Since: 1.12
505 */
506 signals[SIGNAL_FORMAT_LOCATION_FULL] =
507 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509 GST_TYPE_SAMPLE);
510
511 /**
512 * GstSplitMuxSink::split-now:
513 * @splitmux: the #GstSplitMuxSink
514 *
515 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516 * The current GOP will be output to the new file.
517 *
518 * Since: 1.14
519 */
520 signals[SIGNAL_SPLIT_NOW] =
521 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524 G_TYPE_NONE, 0);
525
526 /**
527 * GstSplitMuxSink::split-after:
528 * @splitmux: the #GstSplitMuxSink
529 *
530 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531 * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532 *
533 * Since: 1.16
534 */
535 signals[SIGNAL_SPLIT_AFTER] =
536 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539 G_TYPE_NONE, 0);
540
541 /**
542 * GstSplitMuxSink::split-at-running-time:
543 * @splitmux: the #GstSplitMuxSink
544 *
545 * When called by the user, this action signal splits the video file (and
546 * begins a new one) as soon as the given running time is reached. If this
547 * action signal is called multiple times, running times are queued up and
548 * processed in the order they were given.
549 *
550 * Note that this is prone to race conditions, where said running time is
551 * reached and surpassed before we had a chance to split. The file will
552 * still split immediately, but in order to make sure that the split doesn't
553 * happen too late, it is recommended to call this action signal from
554 * something that will prevent further buffers from flowing into
555 * splitmuxsink before the split is completed, such as a pad probe before
556 * splitmuxsink.
557 *
558 *
559 * Since: 1.16
560 */
561 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565 NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566
567 /**
568 * GstSplitMuxSink::muxer-added:
569 * @splitmux: the #GstSplitMuxSink
570 * @muxer: the newly added muxer element
571 *
572 * Since: 1.14
573 */
574 signals[SIGNAL_MUXER_ADDED] =
575 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577
578 /**
579 * GstSplitMuxSink::sink-added:
580 * @splitmux: the #GstSplitMuxSink
581 * @sink: the newly added sink element
582 *
583 * Since: 1.14
584 */
585 signals[SIGNAL_SINK_ADDED] =
586 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588
589 klass->split_now = split_now;
590 klass->split_after = split_after;
591 klass->split_at_running_time = split_at_running_time;
592 }
593
594 static void
gst_splitmux_sink_init(GstSplitMuxSink * splitmux)595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597 g_mutex_init (&splitmux->lock);
598 g_mutex_init (&splitmux->state_lock);
599 g_cond_init (&splitmux->input_cond);
600 g_cond_init (&splitmux->output_cond);
601 g_queue_init (&splitmux->out_cmd_q);
602
603 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606 splitmux->max_files = DEFAULT_MAX_FILES;
607 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611
612 splitmux->threshold_timecode_str = NULL;
613
614 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616 splitmux->muxer_properties = NULL;
617 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618 splitmux->sink_properties = NULL;
619
620 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621 splitmux->split_requested = FALSE;
622 splitmux->do_split_next_gop = FALSE;
623 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625
626 g_queue_init (&splitmux->pending_input_gops);
627 }
628
629 static void
gst_splitmux_reset_elements(GstSplitMuxSink * splitmux)630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632 if (splitmux->muxer) {
633 gst_element_set_locked_state (splitmux->muxer, TRUE);
634 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636 }
637 if (splitmux->active_sink) {
638 gst_element_set_locked_state (splitmux->active_sink, TRUE);
639 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641 }
642
643 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645
646 static void
gst_splitmux_sink_dispose(GObject * object)647 gst_splitmux_sink_dispose (GObject * object)
648 {
649 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650
651 /* Calling parent dispose invalidates all child pointers */
652 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653
654 G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656
657 static void
gst_splitmux_sink_finalize(GObject * object)658 gst_splitmux_sink_finalize (GObject * object)
659 {
660 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661
662 g_cond_clear (&splitmux->input_cond);
663 g_cond_clear (&splitmux->output_cond);
664 g_mutex_clear (&splitmux->lock);
665 g_mutex_clear (&splitmux->state_lock);
666 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667 g_queue_clear (&splitmux->out_cmd_q);
668 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669 g_queue_clear (&splitmux->pending_input_gops);
670
671 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672
673 if (splitmux->muxerpad_map)
674 gst_structure_free (splitmux->muxerpad_map);
675
676 if (splitmux->provided_sink)
677 gst_object_unref (splitmux->provided_sink);
678 if (splitmux->provided_muxer)
679 gst_object_unref (splitmux->provided_muxer);
680
681 if (splitmux->muxer_factory)
682 g_free (splitmux->muxer_factory);
683 if (splitmux->muxer_preset)
684 g_free (splitmux->muxer_preset);
685 if (splitmux->muxer_properties)
686 gst_structure_free (splitmux->muxer_properties);
687 if (splitmux->sink_factory)
688 g_free (splitmux->sink_factory);
689 if (splitmux->sink_preset)
690 g_free (splitmux->sink_preset);
691 if (splitmux->sink_properties)
692 gst_structure_free (splitmux->sink_properties);
693
694 if (splitmux->threshold_timecode_str)
695 g_free (splitmux->threshold_timecode_str);
696 if (splitmux->tc_interval)
697 gst_video_time_code_interval_free (splitmux->tc_interval);
698
699 if (splitmux->times_to_split)
700 gst_queue_array_free (splitmux->times_to_split);
701
702 g_free (splitmux->location);
703
704 /* Make sure to free any un-released contexts. There should not be any,
705 * because the dispose will have freed all request pads though */
706 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707 g_list_free (splitmux->contexts);
708
709 G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711
712 /*
713 * Set any time threshold to the muxer, if it has
714 * reserved-max-duration and reserved-duration-remaining
715 * properties. Called when creating/claiming the muxer
716 * in create_elements() */
717 static void
update_muxer_properties(GstSplitMuxSink * sink)718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720 GObjectClass *klass;
721 GstClockTime threshold_time;
722
723 sink->muxer_has_reserved_props = FALSE;
724 if (sink->muxer == NULL)
725 return;
726 klass = G_OBJECT_GET_CLASS (sink->muxer);
727 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728 return;
729 if (g_object_class_find_property (klass,
730 "reserved-duration-remaining") == NULL)
731 return;
732 sink->muxer_has_reserved_props = TRUE;
733
734 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735 GST_TIME_ARGS (sink->threshold_time));
736 GST_OBJECT_LOCK (sink);
737 threshold_time = sink->threshold_time;
738 GST_OBJECT_UNLOCK (sink);
739
740 if (threshold_time > 0) {
741 /* Tell the muxer how much space to reserve */
742 GstClockTime muxer_threshold = threshold_time;
743 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744 }
745 }
746
747 static void
gst_splitmux_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749 const GValue * value, GParamSpec * pspec)
750 {
751 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752
753 switch (prop_id) {
754 case PROP_LOCATION:{
755 GST_OBJECT_LOCK (splitmux);
756 g_free (splitmux->location);
757 splitmux->location = g_value_dup_string (value);
758 GST_OBJECT_UNLOCK (splitmux);
759 break;
760 }
761 case PROP_START_INDEX:
762 GST_OBJECT_LOCK (splitmux);
763 splitmux->start_index = g_value_get_int (value);
764 GST_OBJECT_UNLOCK (splitmux);
765 break;
766 case PROP_MAX_SIZE_BYTES:
767 GST_OBJECT_LOCK (splitmux);
768 splitmux->threshold_bytes = g_value_get_uint64 (value);
769 GST_OBJECT_UNLOCK (splitmux);
770 break;
771 case PROP_MAX_SIZE_TIME:
772 GST_OBJECT_LOCK (splitmux);
773 splitmux->threshold_time = g_value_get_uint64 (value);
774 GST_OBJECT_UNLOCK (splitmux);
775 break;
776 case PROP_MAX_SIZE_TIMECODE:
777 GST_OBJECT_LOCK (splitmux);
778 g_free (splitmux->threshold_timecode_str);
779 /* will be calculated later */
780 g_clear_pointer (&splitmux->tc_interval,
781 gst_video_time_code_interval_free);
782
783 splitmux->threshold_timecode_str = g_value_dup_string (value);
784 if (splitmux->threshold_timecode_str) {
785 splitmux->tc_interval =
786 gst_video_time_code_interval_new_from_string
787 (splitmux->threshold_timecode_str);
788 if (!splitmux->tc_interval) {
789 g_warning ("Wrong timecode string %s",
790 splitmux->threshold_timecode_str);
791 g_free (splitmux->threshold_timecode_str);
792 splitmux->threshold_timecode_str = NULL;
793 }
794 }
795 splitmux->next_fragment_start_tc_time =
796 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797 splitmux->fragment_start_time, NULL);
798 if (splitmux->tc_interval && splitmux->fragment_start_tc
799 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800 GST_WARNING_OBJECT (splitmux,
801 "Couldn't calculate next fragment start time for timecode mode");
802 }
803 GST_OBJECT_UNLOCK (splitmux);
804 break;
805 case PROP_SEND_KEYFRAME_REQUESTS:
806 GST_OBJECT_LOCK (splitmux);
807 splitmux->send_keyframe_requests = g_value_get_boolean (value);
808 GST_OBJECT_UNLOCK (splitmux);
809 break;
810 case PROP_MAX_FILES:
811 GST_OBJECT_LOCK (splitmux);
812 splitmux->max_files = g_value_get_uint (value);
813 GST_OBJECT_UNLOCK (splitmux);
814 break;
815 case PROP_MUXER_OVERHEAD:
816 GST_OBJECT_LOCK (splitmux);
817 splitmux->mux_overhead = g_value_get_double (value);
818 GST_OBJECT_UNLOCK (splitmux);
819 break;
820 case PROP_USE_ROBUST_MUXING:
821 GST_OBJECT_LOCK (splitmux);
822 splitmux->use_robust_muxing = g_value_get_boolean (value);
823 GST_OBJECT_UNLOCK (splitmux);
824 if (splitmux->use_robust_muxing)
825 update_muxer_properties (splitmux);
826 break;
827 case PROP_ALIGNMENT_THRESHOLD:
828 GST_OBJECT_LOCK (splitmux);
829 splitmux->alignment_threshold = g_value_get_uint64 (value);
830 GST_OBJECT_UNLOCK (splitmux);
831 break;
832 case PROP_SINK:
833 GST_OBJECT_LOCK (splitmux);
834 gst_clear_object (&splitmux->provided_sink);
835 splitmux->provided_sink = g_value_get_object (value);
836 if (splitmux->provided_sink)
837 gst_object_ref_sink (splitmux->provided_sink);
838 GST_OBJECT_UNLOCK (splitmux);
839 break;
840 case PROP_MUXER:
841 GST_OBJECT_LOCK (splitmux);
842 gst_clear_object (&splitmux->provided_muxer);
843 splitmux->provided_muxer = g_value_get_object (value);
844 if (splitmux->provided_muxer)
845 gst_object_ref_sink (splitmux->provided_muxer);
846 GST_OBJECT_UNLOCK (splitmux);
847 break;
848 case PROP_RESET_MUXER:
849 GST_OBJECT_LOCK (splitmux);
850 splitmux->reset_muxer = g_value_get_boolean (value);
851 GST_OBJECT_UNLOCK (splitmux);
852 break;
853 case PROP_ASYNC_FINALIZE:
854 GST_OBJECT_LOCK (splitmux);
855 splitmux->async_finalize = g_value_get_boolean (value);
856 GST_OBJECT_UNLOCK (splitmux);
857 break;
858 case PROP_MUXER_FACTORY:
859 GST_OBJECT_LOCK (splitmux);
860 if (splitmux->muxer_factory)
861 g_free (splitmux->muxer_factory);
862 splitmux->muxer_factory = g_value_dup_string (value);
863 GST_OBJECT_UNLOCK (splitmux);
864 break;
865 case PROP_MUXER_PRESET:
866 GST_OBJECT_LOCK (splitmux);
867 if (splitmux->muxer_preset)
868 g_free (splitmux->muxer_preset);
869 splitmux->muxer_preset = g_value_dup_string (value);
870 GST_OBJECT_UNLOCK (splitmux);
871 break;
872 case PROP_MUXER_PROPERTIES:
873 GST_OBJECT_LOCK (splitmux);
874 if (splitmux->muxer_properties)
875 gst_structure_free (splitmux->muxer_properties);
876 if (gst_value_get_structure (value))
877 splitmux->muxer_properties =
878 gst_structure_copy (gst_value_get_structure (value));
879 else
880 splitmux->muxer_properties = NULL;
881 GST_OBJECT_UNLOCK (splitmux);
882 break;
883 case PROP_SINK_FACTORY:
884 GST_OBJECT_LOCK (splitmux);
885 if (splitmux->sink_factory)
886 g_free (splitmux->sink_factory);
887 splitmux->sink_factory = g_value_dup_string (value);
888 GST_OBJECT_UNLOCK (splitmux);
889 break;
890 case PROP_SINK_PRESET:
891 GST_OBJECT_LOCK (splitmux);
892 if (splitmux->sink_preset)
893 g_free (splitmux->sink_preset);
894 splitmux->sink_preset = g_value_dup_string (value);
895 GST_OBJECT_UNLOCK (splitmux);
896 break;
897 case PROP_SINK_PROPERTIES:
898 GST_OBJECT_LOCK (splitmux);
899 if (splitmux->sink_properties)
900 gst_structure_free (splitmux->sink_properties);
901 if (gst_value_get_structure (value))
902 splitmux->sink_properties =
903 gst_structure_copy (gst_value_get_structure (value));
904 else
905 splitmux->sink_properties = NULL;
906 GST_OBJECT_UNLOCK (splitmux);
907 break;
908 case PROP_MUXERPAD_MAP:
909 {
910 const GstStructure *s = gst_value_get_structure (value);
911 GST_SPLITMUX_LOCK (splitmux);
912 if (splitmux->muxerpad_map) {
913 gst_structure_free (splitmux->muxerpad_map);
914 }
915 if (s)
916 splitmux->muxerpad_map = gst_structure_copy (s);
917 else
918 splitmux->muxerpad_map = NULL;
919 GST_SPLITMUX_UNLOCK (splitmux);
920 break;
921 }
922 default:
923 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
924 break;
925 }
926 }
927
928 static void
gst_splitmux_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930 GValue * value, GParamSpec * pspec)
931 {
932 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
933
934 switch (prop_id) {
935 case PROP_LOCATION:
936 GST_OBJECT_LOCK (splitmux);
937 g_value_set_string (value, splitmux->location);
938 GST_OBJECT_UNLOCK (splitmux);
939 break;
940 case PROP_START_INDEX:
941 GST_OBJECT_LOCK (splitmux);
942 g_value_set_int (value, splitmux->start_index);
943 GST_OBJECT_UNLOCK (splitmux);
944 break;
945 case PROP_MAX_SIZE_BYTES:
946 GST_OBJECT_LOCK (splitmux);
947 g_value_set_uint64 (value, splitmux->threshold_bytes);
948 GST_OBJECT_UNLOCK (splitmux);
949 break;
950 case PROP_MAX_SIZE_TIME:
951 GST_OBJECT_LOCK (splitmux);
952 g_value_set_uint64 (value, splitmux->threshold_time);
953 GST_OBJECT_UNLOCK (splitmux);
954 break;
955 case PROP_MAX_SIZE_TIMECODE:
956 GST_OBJECT_LOCK (splitmux);
957 g_value_set_string (value, splitmux->threshold_timecode_str);
958 GST_OBJECT_UNLOCK (splitmux);
959 break;
960 case PROP_SEND_KEYFRAME_REQUESTS:
961 GST_OBJECT_LOCK (splitmux);
962 g_value_set_boolean (value, splitmux->send_keyframe_requests);
963 GST_OBJECT_UNLOCK (splitmux);
964 break;
965 case PROP_MAX_FILES:
966 GST_OBJECT_LOCK (splitmux);
967 g_value_set_uint (value, splitmux->max_files);
968 GST_OBJECT_UNLOCK (splitmux);
969 break;
970 case PROP_MUXER_OVERHEAD:
971 GST_OBJECT_LOCK (splitmux);
972 g_value_set_double (value, splitmux->mux_overhead);
973 GST_OBJECT_UNLOCK (splitmux);
974 break;
975 case PROP_USE_ROBUST_MUXING:
976 GST_OBJECT_LOCK (splitmux);
977 g_value_set_boolean (value, splitmux->use_robust_muxing);
978 GST_OBJECT_UNLOCK (splitmux);
979 break;
980 case PROP_ALIGNMENT_THRESHOLD:
981 GST_OBJECT_LOCK (splitmux);
982 g_value_set_uint64 (value, splitmux->alignment_threshold);
983 GST_OBJECT_UNLOCK (splitmux);
984 break;
985 case PROP_SINK:
986 GST_OBJECT_LOCK (splitmux);
987 g_value_set_object (value, splitmux->provided_sink);
988 GST_OBJECT_UNLOCK (splitmux);
989 break;
990 case PROP_MUXER:
991 GST_OBJECT_LOCK (splitmux);
992 g_value_set_object (value, splitmux->provided_muxer);
993 GST_OBJECT_UNLOCK (splitmux);
994 break;
995 case PROP_RESET_MUXER:
996 GST_OBJECT_LOCK (splitmux);
997 g_value_set_boolean (value, splitmux->reset_muxer);
998 GST_OBJECT_UNLOCK (splitmux);
999 break;
1000 case PROP_ASYNC_FINALIZE:
1001 GST_OBJECT_LOCK (splitmux);
1002 g_value_set_boolean (value, splitmux->async_finalize);
1003 GST_OBJECT_UNLOCK (splitmux);
1004 break;
1005 case PROP_MUXER_FACTORY:
1006 GST_OBJECT_LOCK (splitmux);
1007 g_value_set_string (value, splitmux->muxer_factory);
1008 GST_OBJECT_UNLOCK (splitmux);
1009 break;
1010 case PROP_MUXER_PRESET:
1011 GST_OBJECT_LOCK (splitmux);
1012 g_value_set_string (value, splitmux->muxer_preset);
1013 GST_OBJECT_UNLOCK (splitmux);
1014 break;
1015 case PROP_MUXER_PROPERTIES:
1016 GST_OBJECT_LOCK (splitmux);
1017 gst_value_set_structure (value, splitmux->muxer_properties);
1018 GST_OBJECT_UNLOCK (splitmux);
1019 break;
1020 case PROP_SINK_FACTORY:
1021 GST_OBJECT_LOCK (splitmux);
1022 g_value_set_string (value, splitmux->sink_factory);
1023 GST_OBJECT_UNLOCK (splitmux);
1024 break;
1025 case PROP_SINK_PRESET:
1026 GST_OBJECT_LOCK (splitmux);
1027 g_value_set_string (value, splitmux->sink_preset);
1028 GST_OBJECT_UNLOCK (splitmux);
1029 break;
1030 case PROP_SINK_PROPERTIES:
1031 GST_OBJECT_LOCK (splitmux);
1032 gst_value_set_structure (value, splitmux->sink_properties);
1033 GST_OBJECT_UNLOCK (splitmux);
1034 break;
1035 case PROP_MUXERPAD_MAP:
1036 GST_SPLITMUX_LOCK (splitmux);
1037 gst_value_set_structure (value, splitmux->muxerpad_map);
1038 GST_SPLITMUX_UNLOCK (splitmux);
1039 break;
1040 default:
1041 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1042 break;
1043 }
1044 }
1045
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
my_segment_to_running_time(GstSegment * segment,GstClockTime val)1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1049 {
1050 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1051
1052 if (GST_CLOCK_TIME_IS_VALID (val)) {
1053 gboolean sign =
1054 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1055 if (sign > 0)
1056 res = val;
1057 else if (sign < 0)
1058 res = -val;
1059 }
1060 return res;
1061 }
1062
1063 static void
mq_stream_ctx_reset(MqStreamCtx * ctx)1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1065 {
1066 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070 g_queue_clear (&ctx->queued_bufs);
1071 }
1072
1073 static MqStreamCtx *
mq_stream_ctx_new(GstSplitMuxSink * splitmux)1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1075 {
1076 MqStreamCtx *ctx;
1077
1078 ctx = g_new0 (MqStreamCtx, 1);
1079 ctx->splitmux = splitmux;
1080 g_queue_init (&ctx->queued_bufs);
1081 mq_stream_ctx_reset (ctx);
1082
1083 return ctx;
1084 }
1085
1086 static void
mq_stream_ctx_free(MqStreamCtx * ctx)1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1088 {
1089 if (ctx->q) {
1090 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1091
1092 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1093
1094 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095 gst_element_set_locked_state (ctx->q, TRUE);
1096 gst_element_set_state (ctx->q, GST_STATE_NULL);
1097 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098 gst_object_unref (parent);
1099 }
1100 gst_object_unref (ctx->q);
1101 }
1102 gst_object_unref (ctx->sinkpad);
1103 gst_object_unref (ctx->srcpad);
1104 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105 g_queue_clear (&ctx->queued_bufs);
1106 g_free (ctx);
1107 }
1108
1109 static void
send_fragment_opened_closed_msg(GstSplitMuxSink * splitmux,gboolean opened,GstElement * sink)1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1111 GstElement * sink)
1112 {
1113 gchar *location = NULL;
1114 GstMessage *msg;
1115 const gchar *msg_name = opened ?
1116 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1118
1119 if (!opened) {
1120 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1121 if (rtime)
1122 running_time = *rtime;
1123 }
1124
1125 if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126 "location") != NULL)
1127 g_object_get (sink, "location", &location, NULL);
1128
1129 GST_DEBUG_OBJECT (splitmux,
1130 "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131 msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1132
1133 /* If it's in the middle of a teardown, the reference_ctc might have become
1134 * NULL */
1135 if (splitmux->reference_ctx) {
1136 msg = gst_message_new_element (GST_OBJECT (splitmux),
1137 gst_structure_new (msg_name,
1138 "location", G_TYPE_STRING, location,
1139 "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140 "sink", GST_TYPE_ELEMENT, sink, NULL));
1141 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1142 }
1143
1144 g_free (location);
1145 }
1146
1147 static void
send_eos_async(GstSplitMuxSink * splitmux,AsyncEosHelper * helper)1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1149 {
1150 GstEvent *eos;
1151 GstPad *pad;
1152 MqStreamCtx *ctx;
1153
1154 eos = gst_event_new_eos ();
1155 pad = helper->pad;
1156 ctx = helper->ctx;
1157
1158 GST_SPLITMUX_LOCK (splitmux);
1159 if (!pad)
1160 pad = gst_pad_get_peer (ctx->srcpad);
1161 GST_SPLITMUX_UNLOCK (splitmux);
1162
1163 gst_pad_send_event (pad, eos);
1164 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1165
1166 gst_object_unref (pad);
1167 g_free (helper);
1168 }
1169
1170 /* Called with lock held, drops the lock to send EOS to the
1171 * pad
1172 */
1173 static void
send_eos(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1175 {
1176 GstEvent *eos;
1177 GstPad *pad;
1178
1179 eos = gst_event_new_eos ();
1180 pad = gst_pad_get_peer (ctx->srcpad);
1181
1182 ctx->out_eos = TRUE;
1183
1184 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185 GST_SPLITMUX_UNLOCK (splitmux);
1186 gst_pad_send_event (pad, eos);
1187 GST_SPLITMUX_LOCK (splitmux);
1188
1189 gst_object_unref (pad);
1190 }
1191
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193 * to happen in another thread */
1194 static void
eos_context_async(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1196 {
1197 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198 GstPad *srcpad, *sinkpad;
1199
1200 srcpad = ctx->srcpad;
1201 sinkpad = gst_pad_get_peer (srcpad);
1202
1203 helper->ctx = ctx;
1204 helper->pad = sinkpad; /* Takes the reference */
1205
1206 ctx->out_eos_async_done = TRUE;
1207
1208 /* There used to be a bug here, where we had to explicitly remove
1209 * the SINK flag so that GstBin would ignore it for EOS purposes.
1210 * That fixed a race where if splitmuxsink really reaches EOS
1211 * before an asynchronous background element has finished, then
1212 * the bin wouldn't actually send EOS to the pipeline. Even after
1213 * finishing and removing the old element, the bin didn't re-check
1214 * EOS status on removing a SINK element. That bug was fixed
1215 * in core. */
1216 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1217 sinkpad, ctx);
1218
1219 g_assert_nonnull (helper->pad);
1220 gst_element_call_async (GST_ELEMENT (splitmux),
1221 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1222 }
1223
1224 /* Called with lock held. TRUE iff all contexts have a
1225 * pending (or delivered) async eos event */
1226 static gboolean
all_contexts_are_async_eos(GstSplitMuxSink * splitmux)1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1228 {
1229 gboolean ret = TRUE;
1230 GList *item;
1231
1232 for (item = splitmux->contexts; item; item = item->next) {
1233 MqStreamCtx *ctx = item->data;
1234 ret &= ctx->out_eos_async_done;
1235 }
1236 return ret;
1237 }
1238
1239 /* Called with splitmux lock held to check if this output
1240 * context needs to sleep to wait for the release of the
1241 * next GOP, or to send EOS to close out the current file
1242 */
1243 static GstFlowReturn
complete_or_wait_on_out(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1245 {
1246 if (ctx->caps_change)
1247 return GST_FLOW_OK;
1248
1249 do {
1250 /* When first starting up, the reference stream has to output
1251 * the first buffer to prepare the muxer and sink */
1252 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1254
1255 if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256 && my_max_out_running_time != G_MAXINT64) {
1257 my_max_out_running_time -= splitmux->alignment_threshold;
1258 GST_LOG_OBJECT (ctx->srcpad,
1259 "Max out running time currently %" GST_STIME_FORMAT
1260 ", with threshold applied it is %" GST_STIME_FORMAT,
1261 GST_STIME_ARGS (splitmux->max_out_running_time),
1262 GST_STIME_ARGS (my_max_out_running_time));
1263 }
1264
1265 if (ctx->flushing
1266 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267 return GST_FLOW_FLUSHING;
1268
1269 GST_LOG_OBJECT (ctx->srcpad,
1270 "Checking running time %" GST_STIME_FORMAT " against max %"
1271 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272 GST_STIME_ARGS (my_max_out_running_time));
1273
1274 if (can_output) {
1275 if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276 ctx->out_running_time < my_max_out_running_time) {
1277 return GST_FLOW_OK;
1278 }
1279
1280 switch (splitmux->output_state) {
1281 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282 /* We only get here if we've finished outputting a GOP and need to know
1283 * what to do next */
1284 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1286 continue;
1287
1288 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289 case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290 /* We've reached the max out running_time to get here, so end this file now */
1291 if (ctx->out_eos == FALSE) {
1292 if (splitmux->async_finalize) {
1293 /* We must set EOS asynchronously at this point. We cannot defer
1294 * it, because we need all contexts to wake up, for the
1295 * reference context to eventually give us something at
1296 * START_NEXT_FILE. Otherwise, collectpads might choose another
1297 * context to give us the first buffer, and format-location-full
1298 * will not contain a valid sample. */
1299 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300 GINT_TO_POINTER (1));
1301 eos_context_async (ctx, splitmux);
1302 if (all_contexts_are_async_eos (splitmux)) {
1303 GST_INFO_OBJECT (splitmux,
1304 "All contexts are async_eos. Moving to the next file.");
1305 /* We can start the next file once we've asked each pad to go EOS */
1306 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1308 continue;
1309 }
1310 } else {
1311 send_eos (splitmux, ctx);
1312 continue;
1313 }
1314 } else {
1315 GST_INFO_OBJECT (splitmux,
1316 "At end-of-file state, but context %p is already EOS", ctx);
1317 }
1318 break;
1319 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320 if (ctx->is_reference) {
1321 GstFlowReturn ret = GST_FLOW_OK;
1322
1323 /* Special handling on the reference ctx to start new fragments
1324 * and collect commands from the command queue */
1325 /* drops the splitmux lock briefly: */
1326 /* We must have reference ctx in order for format-location-full to
1327 * have a sample */
1328 ret = start_next_fragment (splitmux, ctx);
1329 if (ret != GST_FLOW_OK)
1330 return ret;
1331
1332 continue;
1333 }
1334 break;
1335 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1336 do {
1337 SplitMuxOutputCommand *cmd =
1338 g_queue_pop_tail (&splitmux->out_cmd_q);
1339 if (cmd != NULL) {
1340 /* If we pop the last command, we need to make our queues bigger */
1341 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342 grow_blocked_queues (splitmux);
1343
1344 if (cmd->start_new_fragment) {
1345 if (splitmux->muxed_out_bytes > 0) {
1346 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1348 } else {
1349 GST_DEBUG_OBJECT (splitmux,
1350 "Got cmd to start new fragment, but fragment is empty - ignoring.");
1351 }
1352 } else {
1353 GST_DEBUG_OBJECT (splitmux,
1354 "Got new output cmd for time %" GST_STIME_FORMAT,
1355 GST_STIME_ARGS (cmd->max_output_ts));
1356
1357 /* Extend the output range immediately */
1358 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359 || cmd->max_output_ts > splitmux->max_out_running_time)
1360 splitmux->max_out_running_time = cmd->max_output_ts;
1361 GST_DEBUG_OBJECT (splitmux,
1362 "Max out running time now %" GST_STIME_FORMAT,
1363 GST_STIME_ARGS (splitmux->max_out_running_time));
1364 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1365 }
1366 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1367
1368 out_cmd_buf_free (cmd);
1369 break;
1370 } else {
1371 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1372 }
1373 } while (!ctx->flushing && splitmux->output_state ==
1374 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375 /* loop and re-check the state */
1376 continue;
1377 }
1378 case SPLITMUX_OUTPUT_STATE_STOPPED:
1379 return GST_FLOW_FLUSHING;
1380 }
1381 } else {
1382 GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1383 }
1384
1385 GST_INFO_OBJECT (ctx->srcpad,
1386 "Sleeping for running time %"
1387 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388 GST_STIME_ARGS (ctx->out_running_time),
1389 GST_STIME_ARGS (splitmux->max_out_running_time));
1390 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391 GST_INFO_OBJECT (ctx->srcpad,
1392 "Woken for new max running time %" GST_STIME_FORMAT,
1393 GST_STIME_ARGS (splitmux->max_out_running_time));
1394 }
1395 while (1);
1396
1397 return GST_FLOW_OK;
1398 }
1399
1400 static GstClockTime
calculate_next_max_timecode(GstSplitMuxSink * splitmux,const GstVideoTimeCode * cur_tc,GstClockTime running_time,GstVideoTimeCode ** next_tc)1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403 GstVideoTimeCode ** next_tc)
1404 {
1405 GstVideoTimeCode *target_tc;
1406 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1407
1408 if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409 return GST_CLOCK_TIME_NONE;
1410
1411 target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1412 if (!target_tc) {
1413 GST_ELEMENT_ERROR (splitmux,
1414 STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415 return GST_CLOCK_TIME_NONE;
1416 }
1417
1418 /* Convert to ns */
1419 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1421
1422 /* Add running_time, accounting for wraparound. */
1423 if (target_tc_time >= cur_tc_time) {
1424 next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1425 } else {
1426 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1427
1428 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429 (cur_tc->config.fps_d == 1001)) {
1430 /* Checking fps_d is probably unneeded, but better safe than sorry
1431 * (e.g. someone accidentally set a flag) */
1432 GstVideoTimeCode *tc_for_offset;
1433
1434 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435 * but slightly less. Calculate that duration from a fake timecode. The
1436 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437 * is to add one frame to 23:59:59;29 */
1438 tc_for_offset =
1439 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440 NULL, cur_tc->config.flags, 23, 59, 59,
1441 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1442 day_in_ns =
1443 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445 cur_tc->config.fps_n);
1446 gst_video_time_code_free (tc_for_offset);
1447 }
1448 next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1449 }
1450
1451 #ifndef GST_DISABLE_GST_DEBUG
1452 {
1453 gchar *next_max_tc_str, *cur_tc_str;
1454
1455 cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456 next_max_tc_str = gst_video_time_code_to_string (target_tc);
1457
1458 GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459 " from ref timecode %s time: %" GST_TIME_FORMAT,
1460 next_max_tc_str,
1461 GST_TIME_ARGS (next_max_tc_time),
1462 cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1463
1464 g_free (next_max_tc_str);
1465 g_free (cur_tc_str);
1466 }
1467 #endif
1468
1469 if (next_tc)
1470 *next_tc = target_tc;
1471 else
1472 gst_video_time_code_free (target_tc);
1473
1474 return next_max_tc_time;
1475 }
1476
1477 static gboolean
request_next_keyframe(GstSplitMuxSink * splitmux,GstBuffer * buffer,GstClockTimeDiff running_time_dts)1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479 GstClockTimeDiff running_time_dts)
1480 {
1481 GstEvent *ev;
1482 GstClockTime target_time;
1483 gboolean timecode_based = FALSE;
1484 GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485 GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486 GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487 GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488 InputGop *newest_gop = NULL;
1489 GList *l;
1490
1491 if (!splitmux->send_keyframe_requests)
1492 return TRUE;
1493
1494 /* Find the newest GOP where we passed in DTS the start PTS */
1495 for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496 InputGop *tmp = l->data;
1497
1498 GST_TRACE_OBJECT (splitmux,
1499 "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500 " and start time %" GST_STIME_FORMAT,
1501 GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1502
1503 if (tmp->sent_fku) {
1504 GST_DEBUG_OBJECT (splitmux,
1505 "Already checked for a keyframe request for this GOP");
1506 return TRUE;
1507 }
1508
1509 if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510 tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511 running_time_dts >= tmp->start_time_pts) {
1512 GST_DEBUG_OBJECT (splitmux,
1513 "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514 GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515 GST_STIME_ARGS (tmp->start_time));
1516 newest_gop = tmp;
1517 break;
1518 }
1519 }
1520
1521 if (!newest_gop) {
1522 GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1523 return TRUE;
1524 }
1525
1526 if (splitmux->tc_interval) {
1527 if (newest_gop->start_tc
1528 && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529 GstVideoTimeCode *next_tc = NULL;
1530 max_tc_time =
1531 calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532 newest_gop->start_time, &next_tc);
1533
1534 /* calculate the next expected keyframe time to prevent too early fku
1535 * event */
1536 if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1537 next_max_tc_time =
1538 calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1539 }
1540 if (next_tc)
1541 gst_video_time_code_free (next_tc);
1542
1543 timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544 GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1545
1546 if (!timecode_based) {
1547 GST_WARNING_OBJECT (splitmux,
1548 "Couldn't calculate maximum fragment time for timecode mode");
1549 }
1550 } else {
1551 /* This can happen in the presence of GAP events that trigger
1552 * a new fragment start */
1553 GST_WARNING_OBJECT (splitmux,
1554 "No buffer available to calculate next timecode");
1555 }
1556 }
1557
1558 if ((splitmux->threshold_time == 0 && !timecode_based)
1559 || splitmux->threshold_bytes != 0)
1560 return TRUE;
1561
1562 if (timecode_based) {
1563 /* We might have rounding errors: aim slightly earlier */
1564 if (max_tc_time >= tc_rounding_error) {
1565 target_time = max_tc_time - tc_rounding_error;
1566 } else {
1567 /* unreliable target time */
1568 GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569 " is smaller than allowed rounding error, set it to zero",
1570 GST_TIME_ARGS (max_tc_time));
1571 target_time = 0;
1572 }
1573
1574 if (next_max_tc_time >= tc_rounding_error) {
1575 next_fku_time = next_max_tc_time - tc_rounding_error;
1576 } else {
1577 /* unreliable target time */
1578 GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579 " is smaller than allowed rounding error, set it to zero",
1580 GST_TIME_ARGS (next_max_tc_time));
1581 next_fku_time = 0;
1582 }
1583 } else {
1584 target_time = newest_gop->start_time + splitmux->threshold_time;
1585 }
1586
1587 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588 GstClockTime allowed_time = splitmux->next_fku_time;
1589
1590 if (timecode_based) {
1591 if (allowed_time >= tc_rounding_error) {
1592 allowed_time -= tc_rounding_error;
1593 } else {
1594 /* unreliable next force key unit time */
1595 GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1596 GST_TIME_FORMAT
1597 " is smaller than allowed rounding error, set it to zero",
1598 GST_TIME_ARGS (splitmux->next_fku_time));
1599 allowed_time = 0;
1600 }
1601 }
1602
1603 if (target_time < allowed_time) {
1604 GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606 ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607 GST_TIME_ARGS (target_time),
1608 GST_TIME_ARGS (splitmux->next_fku_time),
1609 GST_TIME_ARGS (allowed_time));
1610
1611 return TRUE;
1612 } else if (allowed_time != splitmux->next_fku_time &&
1613 target_time < splitmux->next_fku_time) {
1614 GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616 ", but the difference is smaller than allowed rounding error",
1617 GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1618 }
1619 }
1620
1621 if (!timecode_based) {
1622 next_fku_time = target_time + splitmux->threshold_time;
1623 }
1624
1625 GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626 ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627 GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1628
1629 newest_gop->sent_fku = TRUE;
1630
1631 splitmux->next_fku_time = next_fku_time;
1632 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1633
1634 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1635 }
1636
1637 static GstPadProbeReturn
handle_mq_output(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1639 {
1640 GstSplitMuxSink *splitmux = ctx->splitmux;
1641 MqStreamBuf *buf_info = NULL;
1642 GstFlowReturn ret = GST_FLOW_OK;
1643
1644 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1645
1646 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648 g_warning ("Buffer list handling not implemented");
1649 return GST_PAD_PROBE_DROP;
1650 }
1651 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653 GstEvent *event = gst_pad_probe_info_get_event (info);
1654 gboolean locked = FALSE, wait = !ctx->is_reference;
1655
1656 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1657
1658 switch (GST_EVENT_TYPE (event)) {
1659 case GST_EVENT_SEGMENT:
1660 gst_event_copy_segment (event, &ctx->out_segment);
1661 break;
1662 case GST_EVENT_FLUSH_STOP:
1663 GST_SPLITMUX_LOCK (splitmux);
1664 locked = TRUE;
1665 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667 g_queue_clear (&ctx->queued_bufs);
1668 g_queue_clear (&ctx->queued_bufs);
1669 /* If this is the reference context, we just threw away any queued keyframes */
1670 if (ctx->is_reference)
1671 splitmux->queued_keyframes = 0;
1672 ctx->flushing = FALSE;
1673 wait = FALSE;
1674 break;
1675 case GST_EVENT_FLUSH_START:
1676 GST_SPLITMUX_LOCK (splitmux);
1677 locked = TRUE;
1678 GST_LOG_OBJECT (pad, "Flush start");
1679 ctx->flushing = TRUE;
1680 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1682 break;
1683 case GST_EVENT_EOS:
1684 GST_SPLITMUX_LOCK (splitmux);
1685 locked = TRUE;
1686 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1687 goto beach;
1688 ctx->out_eos = TRUE;
1689
1690 if (ctx == splitmux->reference_ctx) {
1691 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1693 }
1694
1695 GST_INFO_OBJECT (splitmux,
1696 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1697 break;
1698 case GST_EVENT_GAP:{
1699 GstClockTime gap_ts;
1700 GstClockTimeDiff rtime;
1701
1702 gst_event_parse_gap (event, &gap_ts, NULL);
1703 if (gap_ts == GST_CLOCK_TIME_NONE)
1704 break;
1705
1706 GST_SPLITMUX_LOCK (splitmux);
1707 locked = TRUE;
1708
1709 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1710 goto beach;
1711
1712 /* When we get a gap event on the
1713 * reference stream and we're trying to open a
1714 * new file, we need to store it until we get
1715 * the buffer afterwards
1716 */
1717 if (ctx->is_reference &&
1718 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720 gst_event_replace (&ctx->pending_gap, event);
1721 GST_SPLITMUX_UNLOCK (splitmux);
1722 return GST_PAD_PROBE_HANDLED;
1723 }
1724
1725 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1726
1727 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728 GST_STIME_ARGS (rtime));
1729
1730 if (rtime != GST_CLOCK_STIME_NONE) {
1731 ctx->out_running_time = rtime;
1732 complete_or_wait_on_out (splitmux, ctx);
1733 }
1734 break;
1735 }
1736 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737 const GstStructure *s;
1738 GstClockTimeDiff ts = 0;
1739
1740 s = gst_event_get_structure (event);
1741 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1742 break;
1743
1744 gst_structure_get_int64 (s, "timestamp", &ts);
1745
1746 GST_SPLITMUX_LOCK (splitmux);
1747 locked = TRUE;
1748
1749 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1750 goto beach;
1751 ctx->out_running_time = ts;
1752 if (!ctx->is_reference)
1753 ret = complete_or_wait_on_out (splitmux, ctx);
1754 GST_SPLITMUX_UNLOCK (splitmux);
1755 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756 return GST_PAD_PROBE_DROP;
1757 }
1758 case GST_EVENT_CAPS:{
1759 GstPad *peer;
1760
1761 if (!ctx->is_reference)
1762 break;
1763
1764 peer = gst_pad_get_peer (pad);
1765 if (peer) {
1766 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1767
1768 gst_object_unref (peer);
1769
1770 if (ok)
1771 break;
1772
1773 } else {
1774 break;
1775 }
1776 /* This is in the case the muxer doesn't allow this change of caps */
1777 GST_SPLITMUX_LOCK (splitmux);
1778 locked = TRUE;
1779 ctx->caps_change = TRUE;
1780
1781 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782 GST_DEBUG_OBJECT (splitmux,
1783 "New caps were not accepted. Switching output file");
1784 if (ctx->out_eos == FALSE) {
1785 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1787 }
1788 }
1789
1790 /* Lets it fall through, if it fails again, then the muxer just can't
1791 * support this format, but at least we have a closed file.
1792 */
1793 break;
1794 }
1795 default:
1796 break;
1797 }
1798
1799 /* We need to make sure events aren't passed
1800 * until the muxer / sink are ready for it */
1801 if (!locked)
1802 GST_SPLITMUX_LOCK (splitmux);
1803 if (wait)
1804 ret = complete_or_wait_on_out (splitmux, ctx);
1805 GST_SPLITMUX_UNLOCK (splitmux);
1806
1807 /* Don't try to forward sticky events before the next buffer is there
1808 * because it would cause a new file to be created without the first
1809 * buffer being available.
1810 */
1811 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813 gst_event_unref (event);
1814 return GST_PAD_PROBE_HANDLED;
1815 } else {
1816 return GST_PAD_PROBE_PASS;
1817 }
1818 }
1819
1820 /* Allow everything through until the configured next stopping point */
1821 GST_SPLITMUX_LOCK (splitmux);
1822
1823 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824 if (buf_info == NULL) {
1825 /* Can only happen due to a poorly timed flush */
1826 ret = GST_FLOW_FLUSHING;
1827 goto beach;
1828 }
1829
1830 /* If we have popped a keyframe, decrement the queued_gop count */
1831 if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832 splitmux->queued_keyframes--;
1833
1834 ctx->out_running_time = buf_info->run_ts;
1835 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1836
1837 GST_LOG_OBJECT (splitmux,
1838 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839 " size %" G_GUINT64_FORMAT,
1840 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1841
1842 ctx->caps_change = FALSE;
1843
1844 ret = complete_or_wait_on_out (splitmux, ctx);
1845
1846 splitmux->muxed_out_bytes += buf_info->buf_size;
1847
1848 #ifndef GST_DISABLE_GST_DEBUG
1849 {
1850 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852 " run ts %" GST_STIME_FORMAT, buf,
1853 GST_STIME_ARGS (ctx->out_running_time));
1854 }
1855 #endif
1856
1857 ctx->cur_out_buffer = NULL;
1858 GST_SPLITMUX_UNLOCK (splitmux);
1859
1860 /* pending_gap is protected by the STREAM lock */
1861 if (ctx->pending_gap) {
1862 /* If we previously stored a gap event, send it now */
1863 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1864
1865 GST_DEBUG_OBJECT (splitmux,
1866 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1867
1868 gst_pad_send_event (peer, ctx->pending_gap);
1869 ctx->pending_gap = NULL;
1870
1871 gst_object_unref (peer);
1872 }
1873
1874 mq_stream_buf_free (buf_info);
1875
1876 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877 return GST_PAD_PROBE_PASS;
1878
1879 beach:
1880 GST_SPLITMUX_UNLOCK (splitmux);
1881 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882 return GST_PAD_PROBE_DROP;
1883 }
1884
1885 static gboolean
resend_sticky(GstPad * pad,GstEvent ** event,GstPad * peer)1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1887 {
1888 return gst_pad_send_event (peer, gst_event_ref (*event));
1889 }
1890
1891 static void
unlock_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1893 {
1894 if (ctx->fragment_block_id > 0) {
1895 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896 ctx->fragment_block_id = 0;
1897 }
1898 }
1899
1900 static void
restart_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1902 {
1903 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1904
1905 gst_pad_sticky_events_foreach (ctx->srcpad,
1906 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1907
1908 /* Clear EOS flag if not actually EOS */
1909 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910 ctx->out_eos_async_done = ctx->out_eos;
1911
1912 gst_object_unref (peer);
1913 }
1914
1915 static void
relink_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1917 {
1918 GstPad *sinkpad, *srcpad, *newpad;
1919 GstPadTemplate *templ;
1920
1921 srcpad = ctx->srcpad;
1922 sinkpad = gst_pad_get_peer (srcpad);
1923
1924 templ = sinkpad->padtemplate;
1925 newpad =
1926 gst_element_request_pad (splitmux->muxer, templ,
1927 GST_PAD_NAME (sinkpad), NULL);
1928
1929 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1930 newpad);
1931 if (!gst_pad_unlink (srcpad, sinkpad)) {
1932 gst_object_unref (sinkpad);
1933 goto fail;
1934 }
1935 if (gst_pad_link_full (srcpad, newpad,
1936 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937 gst_element_release_request_pad (splitmux->muxer, newpad);
1938 gst_object_unref (sinkpad);
1939 gst_object_unref (newpad);
1940 goto fail;
1941 }
1942 gst_object_unref (newpad);
1943 gst_object_unref (sinkpad);
1944 return;
1945
1946 fail:
1947 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948 ("Could not create the new muxer/sink"), NULL);
1949 }
1950
1951 static GstPadProbeReturn
_block_pad(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1953 {
1954 return GST_PAD_PROBE_OK;
1955 }
1956
1957 static void
block_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1959 {
1960 ctx->fragment_block_id =
1961 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1962 NULL, NULL);
1963 }
1964
1965 static gboolean
_set_property_from_structure(GQuark field_id,const GValue * value,gpointer user_data)1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1967 gpointer user_data)
1968 {
1969 const gchar *property_name = g_quark_to_string (field_id);
1970 GObject *element = G_OBJECT (user_data);
1971
1972 g_object_set_property (element, property_name, value);
1973
1974 return TRUE;
1975 }
1976
1977 static void
_lock_and_set_to_null(GstElement * element,GstSplitMuxSink * splitmux)1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1979 {
1980 gst_element_set_locked_state (element, TRUE);
1981 gst_element_set_state (element, GST_STATE_NULL);
1982 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983 gst_bin_remove (GST_BIN (splitmux), element);
1984 }
1985
1986
1987 static void
_send_event(const GValue * value,gpointer user_data)1988 _send_event (const GValue * value, gpointer user_data)
1989 {
1990 GstPad *pad = g_value_get_object (value);
1991 GstEvent *ev = user_data;
1992
1993 gst_pad_send_event (pad, gst_event_ref (ev));
1994 }
1995
1996 /* Called with lock held when a fragment
1997 * reaches EOS and it is time to restart
1998 * a new fragment
1999 */
2000 static GstFlowReturn
start_next_fragment(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2002 {
2003 GstElement *muxer, *sink;
2004
2005 g_assert (ctx->is_reference);
2006
2007 /* 1 change to new file */
2008 splitmux->switching_fragment = TRUE;
2009
2010 /* We need to drop the splitmux lock to acquire the state lock
2011 * here and ensure there's no racy state change going on elsewhere */
2012 muxer = gst_object_ref (splitmux->muxer);
2013 sink = gst_object_ref (splitmux->active_sink);
2014
2015 GST_SPLITMUX_UNLOCK (splitmux);
2016 GST_SPLITMUX_STATE_LOCK (splitmux);
2017
2018 if (splitmux->shutdown) {
2019 GST_DEBUG_OBJECT (splitmux,
2020 "Shutdown requested. Aborting fragment switch.");
2021 GST_SPLITMUX_LOCK (splitmux);
2022 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023 gst_object_unref (muxer);
2024 gst_object_unref (sink);
2025 return GST_FLOW_FLUSHING;
2026 }
2027
2028 if (splitmux->async_finalize) {
2029 if (splitmux->muxed_out_bytes > 0
2030 || splitmux->fragment_id != splitmux->start_index) {
2031 gchar *newname;
2032 GstElement *new_sink, *new_muxer;
2033
2034 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035 splitmux->fragment_id);
2036 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038 GST_SPLITMUX_LOCK (splitmux);
2039 if ((splitmux->sink =
2040 create_element (splitmux, splitmux->sink_factory, newname,
2041 TRUE)) == NULL)
2042 goto fail;
2043 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044 gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045 splitmux->sink_preset);
2046 if (splitmux->sink_properties)
2047 gst_structure_foreach (splitmux->sink_properties,
2048 _set_property_from_structure, splitmux->sink);
2049 splitmux->active_sink = splitmux->sink;
2050 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2051 g_free (newname);
2052 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053 if ((splitmux->muxer =
2054 create_element (splitmux, splitmux->muxer_factory, newname,
2055 TRUE)) == NULL)
2056 goto fail;
2057 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2058 "async") != NULL) {
2059 /* async child elements are causing state change races and weird
2060 * failures, so let's try and turn that off */
2061 g_object_set (splitmux->sink, "async", FALSE, NULL);
2062 }
2063 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065 splitmux->muxer_preset);
2066 if (splitmux->muxer_properties)
2067 gst_structure_foreach (splitmux->muxer_properties,
2068 _set_property_from_structure, splitmux->muxer);
2069 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2070 g_free (newname);
2071 new_sink = splitmux->sink;
2072 new_muxer = splitmux->muxer;
2073 GST_SPLITMUX_UNLOCK (splitmux);
2074 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075 gst_element_link (new_muxer, new_sink);
2076
2077 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079 EOS_FROM_US)) == 2) {
2080 _lock_and_set_to_null (muxer, splitmux);
2081 _lock_and_set_to_null (sink, splitmux);
2082 } else {
2083 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084 GINT_TO_POINTER (2));
2085 }
2086 }
2087 gst_object_unref (muxer);
2088 gst_object_unref (sink);
2089 muxer = new_muxer;
2090 sink = new_sink;
2091 gst_object_ref (muxer);
2092 gst_object_ref (sink);
2093 }
2094 } else {
2095
2096 gst_element_set_locked_state (muxer, TRUE);
2097 gst_element_set_locked_state (sink, TRUE);
2098 gst_element_set_state (sink, GST_STATE_NULL);
2099
2100 if (splitmux->reset_muxer) {
2101 gst_element_set_state (muxer, GST_STATE_NULL);
2102 } else {
2103 GstIterator *it = gst_element_iterate_sink_pads (muxer);
2104 GstEvent *ev;
2105 guint32 seqnum;
2106
2107 ev = gst_event_new_flush_start ();
2108 seqnum = gst_event_get_seqnum (ev);
2109 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110 gst_event_unref (ev);
2111
2112 gst_iterator_resync (it);
2113
2114 ev = gst_event_new_flush_stop (TRUE);
2115 gst_event_set_seqnum (ev, seqnum);
2116 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117 gst_event_unref (ev);
2118
2119 gst_iterator_free (it);
2120 }
2121 }
2122
2123 GST_SPLITMUX_LOCK (splitmux);
2124 set_next_filename (splitmux, ctx);
2125 splitmux->muxed_out_bytes = 0;
2126 GST_SPLITMUX_UNLOCK (splitmux);
2127
2128 if (gst_element_set_state (sink,
2129 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130 gst_element_set_state (sink, GST_STATE_NULL);
2131 gst_element_set_locked_state (muxer, FALSE);
2132 gst_element_set_locked_state (sink, FALSE);
2133
2134 goto fail_output;
2135 }
2136
2137 if (gst_element_set_state (muxer,
2138 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139 gst_element_set_state (muxer, GST_STATE_NULL);
2140 gst_element_set_state (sink, GST_STATE_NULL);
2141 gst_element_set_locked_state (muxer, FALSE);
2142 gst_element_set_locked_state (sink, FALSE);
2143 goto fail_muxer;
2144 }
2145
2146 gst_element_set_locked_state (muxer, FALSE);
2147 gst_element_set_locked_state (sink, FALSE);
2148
2149 gst_object_unref (sink);
2150 gst_object_unref (muxer);
2151
2152 GST_SPLITMUX_LOCK (splitmux);
2153 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154 splitmux->switching_fragment = FALSE;
2155 do_async_done (splitmux);
2156
2157 splitmux->ready_for_output = TRUE;
2158
2159 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2161
2162 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2163
2164 /* FIXME: Is this always the correct next state? */
2165 GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2168 return GST_FLOW_OK;
2169
2170 fail:
2171 gst_object_unref (sink);
2172 gst_object_unref (muxer);
2173
2174 GST_SPLITMUX_LOCK (splitmux);
2175 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177 ("Could not create the new muxer/sink"), NULL);
2178 return GST_FLOW_ERROR;
2179
2180 fail_output:
2181 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182 ("Could not start new output sink"), NULL);
2183 gst_object_unref (sink);
2184 gst_object_unref (muxer);
2185
2186 GST_SPLITMUX_LOCK (splitmux);
2187 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188 splitmux->switching_fragment = FALSE;
2189 return GST_FLOW_ERROR;
2190
2191 fail_muxer:
2192 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193 ("Could not start new muxer"), NULL);
2194 gst_object_unref (sink);
2195 gst_object_unref (muxer);
2196
2197 GST_SPLITMUX_LOCK (splitmux);
2198 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199 splitmux->switching_fragment = FALSE;
2200 return GST_FLOW_ERROR;
2201 }
2202
2203 static void
bus_handler(GstBin * bin,GstMessage * message)2204 bus_handler (GstBin * bin, GstMessage * message)
2205 {
2206 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2207
2208 switch (GST_MESSAGE_TYPE (message)) {
2209 case GST_MESSAGE_EOS:{
2210 /* If the state is draining out the current file, drop this EOS */
2211 GstElement *sink;
2212
2213 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214 GST_SPLITMUX_LOCK (splitmux);
2215
2216 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2217
2218 if (splitmux->async_finalize) {
2219
2220 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222 EOS_FROM_US)) == 2) {
2223 GstElement *muxer;
2224 GstPad *sinksink, *muxersrc;
2225
2226 sinksink = gst_element_get_static_pad (sink, "sink");
2227 muxersrc = gst_pad_get_peer (sinksink);
2228 muxer = gst_pad_get_parent_element (muxersrc);
2229 gst_object_unref (sinksink);
2230 gst_object_unref (muxersrc);
2231
2232 gst_element_call_async (muxer,
2233 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234 gst_object_ref (splitmux), gst_object_unref);
2235 gst_element_call_async (sink,
2236 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237 gst_object_ref (splitmux), gst_object_unref);
2238 gst_object_unref (muxer);
2239 } else {
2240 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241 GINT_TO_POINTER (2));
2242 }
2243 GST_DEBUG_OBJECT (splitmux,
2244 "Caught async EOS from previous muxer+sink. Dropping.");
2245 /* We forward the EOS so that it gets aggregated as normal. If the sink
2246 * finishes and is removed before the end, it will be de-aggregated */
2247 gst_message_unref (message);
2248 GST_SPLITMUX_UNLOCK (splitmux);
2249 return;
2250 }
2251 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252 GST_DEBUG_OBJECT (splitmux,
2253 "Passing EOS message. Output state %d max_out_running_time %"
2254 GST_STIME_FORMAT, splitmux->output_state,
2255 GST_STIME_ARGS (splitmux->max_out_running_time));
2256 } else {
2257 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2260
2261 gst_message_unref (message);
2262 GST_SPLITMUX_UNLOCK (splitmux);
2263 return;
2264 }
2265 GST_SPLITMUX_UNLOCK (splitmux);
2266 break;
2267 }
2268 case GST_MESSAGE_ASYNC_START:
2269 case GST_MESSAGE_ASYNC_DONE:
2270 /* Ignore state changes from our children while switching */
2271 GST_SPLITMUX_LOCK (splitmux);
2272 if (splitmux->switching_fragment) {
2273 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275 GST_LOG_OBJECT (splitmux,
2276 "Ignoring state change from child %" GST_PTR_FORMAT
2277 " while switching", GST_MESSAGE_SRC (message));
2278 gst_message_unref (message);
2279 GST_SPLITMUX_UNLOCK (splitmux);
2280 return;
2281 }
2282 }
2283 GST_SPLITMUX_UNLOCK (splitmux);
2284 break;
2285 case GST_MESSAGE_WARNING:
2286 {
2287 GError *gerror = NULL;
2288
2289 gst_message_parse_warning (message, &gerror, NULL);
2290
2291 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2292 GList *item;
2293 gboolean caps_change = FALSE;
2294
2295 GST_SPLITMUX_LOCK (splitmux);
2296
2297 for (item = splitmux->contexts; item; item = item->next) {
2298 MqStreamCtx *ctx = item->data;
2299
2300 if (ctx->caps_change) {
2301 caps_change = TRUE;
2302 break;
2303 }
2304 }
2305
2306 GST_SPLITMUX_UNLOCK (splitmux);
2307
2308 if (caps_change) {
2309 GST_LOG_OBJECT (splitmux,
2310 "Ignoring warning change from child %" GST_PTR_FORMAT
2311 " while switching caps", GST_MESSAGE_SRC (message));
2312 gst_message_unref (message);
2313 return;
2314 }
2315 }
2316 break;
2317 }
2318 default:
2319 break;
2320 }
2321
2322 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2323 }
2324
2325 static void
ctx_set_unblock(MqStreamCtx * ctx)2326 ctx_set_unblock (MqStreamCtx * ctx)
2327 {
2328 ctx->need_unblock = TRUE;
2329 }
2330
2331 static gboolean
need_new_fragment(GstSplitMuxSink * splitmux,GstClockTime queued_time,GstClockTime queued_gop_time,guint64 queued_bytes)2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333 GstClockTime queued_time, GstClockTime queued_gop_time,
2334 guint64 queued_bytes)
2335 {
2336 guint64 thresh_bytes;
2337 GstClockTime thresh_time;
2338 gboolean check_robust_muxing;
2339 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340 GstClockTime *ptr_to_time;
2341 const InputGop *gop, *next_gop;
2342
2343 GST_OBJECT_LOCK (splitmux);
2344 thresh_bytes = splitmux->threshold_bytes;
2345 thresh_time = splitmux->threshold_time;
2346 ptr_to_time = (GstClockTime *)
2347 gst_queue_array_peek_head_struct (splitmux->times_to_split);
2348 if (ptr_to_time)
2349 time_to_split = *ptr_to_time;
2350 check_robust_muxing = splitmux->use_robust_muxing
2351 && splitmux->muxer_has_reserved_props;
2352 GST_OBJECT_UNLOCK (splitmux);
2353
2354 /* Have we muxed at least one thing from the reference
2355 * stream into the file? If not, no other streams can have
2356 * either */
2357 if (splitmux->fragment_reference_bytes <= 0) {
2358 GST_TRACE_OBJECT (splitmux,
2359 "Not ready to split - nothing muxed on the reference stream");
2360 return FALSE;
2361 }
2362
2363 /* User told us to split now */
2364 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365 GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2366 return TRUE;
2367 }
2368
2369 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370 /* We need a full GOP queued up at this point */
2371 g_assert (gop != NULL);
2372 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373 /* And the beginning of the next GOP or otherwise EOS */
2374
2375 /* User told us to split at this running time */
2376 if (gop->start_time >= time_to_split) {
2377 GST_OBJECT_LOCK (splitmux);
2378 /* Dequeue running time */
2379 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380 /* Empty any running times after this that are past now */
2381 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382 while (ptr_to_time) {
2383 time_to_split = *ptr_to_time;
2384 if (gop->start_time < time_to_split) {
2385 break;
2386 }
2387 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2389 }
2390 GST_TRACE_OBJECT (splitmux,
2391 "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392 GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393 GST_STIME_ARGS (time_to_split));
2394 GST_OBJECT_UNLOCK (splitmux);
2395 return TRUE;
2396 }
2397
2398 if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399 GST_TRACE_OBJECT (splitmux,
2400 "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401 return TRUE; /* Would overrun byte limit */
2402 }
2403
2404 if (thresh_time > 0 && queued_time > thresh_time) {
2405 GST_TRACE_OBJECT (splitmux,
2406 "queued time %" GST_STIME_FORMAT " overruns time limit",
2407 GST_STIME_ARGS (queued_time));
2408 return TRUE; /* Would overrun time limit */
2409 }
2410
2411 if (splitmux->tc_interval) {
2412 GstClockTime next_gop_start_time =
2413 next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2414
2415 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416 GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417 next_gop_start_time >
2418 splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419 GST_TRACE_OBJECT (splitmux,
2420 "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421 GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2423 return TRUE;
2424 }
2425 }
2426
2427 if (check_robust_muxing) {
2428 GstClockTime mux_reserved_remain;
2429
2430 g_object_get (splitmux->muxer,
2431 "reserved-duration-remaining", &mux_reserved_remain, NULL);
2432
2433 GST_LOG_OBJECT (splitmux,
2434 "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436 mux_reserved_remain, queued_gop_time);
2437
2438 if (queued_gop_time >= mux_reserved_remain) {
2439 GST_INFO_OBJECT (splitmux,
2440 "File is about to run out of header room - %" G_GUINT64_FORMAT
2441 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442 ". Switching to new file", mux_reserved_remain, queued_gop_time);
2443 return TRUE;
2444 }
2445 }
2446
2447 /* Continue and mux this GOP */
2448 return FALSE;
2449 }
2450
2451 /* probably we want to add this API? */
2452 static void
video_time_code_replace(GstVideoTimeCode ** old_tc,GstVideoTimeCode * new_tc)2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2454 {
2455 GstVideoTimeCode *timecode = NULL;
2456
2457 g_return_if_fail (old_tc != NULL);
2458
2459 if (*old_tc == new_tc)
2460 return;
2461
2462 if (new_tc)
2463 timecode = gst_video_time_code_copy (new_tc);
2464
2465 if (*old_tc)
2466 gst_video_time_code_free (*old_tc);
2467
2468 *old_tc = timecode;
2469 }
2470
2471 #ifdef OHOS_OPT_COMPAT
2472 // ohos.opt.compat.0039
2473 // Ignore the error and continue to execute, waiting for EOS
continue_execution(GstSplitMuxSink * splitmux,GstClockTimeDiff max_out_running_time)2474 static void continue_execution (GstSplitMuxSink * splitmux, GstClockTimeDiff max_out_running_time)
2475 {
2476 SplitMuxOutputCommand *cmd;
2477
2478 if (splitmux) {
2479 // Make handle_mq_Input is not blocked
2480 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2481 // Wake up handle_mq_output
2482 cmd = out_cmd_buf_new ();
2483 cmd->start_new_fragment = FALSE;
2484 cmd->max_output_ts = max_out_running_time;
2485 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2486 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2487 }
2488
2489 return;
2490 }
2491 #endif
2492
2493 /* Called with splitmux lock held */
2494 /* Called when entering ProcessingCompleteGop state
2495 * Assess if mq contents overflowed the current file
2496 * -> If yes, need to switch to new file
2497 * -> if no, set max_out_running_time to let this GOP in and
2498 * go to COLLECTING_GOP_START state
2499 */
2500 static void
handle_gathered_gop(GstSplitMuxSink * splitmux,const InputGop * gop,GstClockTimeDiff next_gop_start_time,GstClockTimeDiff max_out_running_time)2501 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2502 GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2503 {
2504 guint64 queued_bytes;
2505 GstClockTimeDiff queued_time = 0;
2506 GstClockTimeDiff queued_gop_time = 0;
2507 SplitMuxOutputCommand *cmd;
2508
2509 /* Assess if the multiqueue contents overflowed the current file */
2510 /* When considering if a newly gathered GOP overflows
2511 * the time limit for the file, only consider the running time of the
2512 * reference stream. Other streams might have run ahead a little bit,
2513 * but extra pieces won't be released to the muxer beyond the reference
2514 * stream cut-off anyway - so it forms the limit. */
2515 queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2516 queued_time = next_gop_start_time;
2517 /* queued_gop_time tracks how much unwritten data there is waiting to
2518 * be written to this fragment including this GOP */
2519 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2520 queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2521 else
2522 queued_gop_time = queued_time - gop->start_time;
2523
2524 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2525 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2526 " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2527 " gop start time %" GST_STIME_FORMAT,
2528 GST_STIME_ARGS (queued_time), queued_bytes,
2529 GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2530
2531 if (queued_gop_time < 0)
2532 goto error_gop_duration;
2533
2534 if (queued_time < splitmux->fragment_start_time)
2535 goto error_queued_time;
2536
2537 queued_time -= splitmux->fragment_start_time;
2538 if (queued_time < queued_gop_time)
2539 queued_gop_time = queued_time;
2540
2541 /* Expand queued bytes estimate by muxer overhead */
2542 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2543
2544 /* Check for overrun - have we output at least one byte and overrun
2545 * either threshold? */
2546 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2547 if (splitmux->async_finalize) {
2548 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2549 *sink_running_time = splitmux->reference_ctx->out_running_time;
2550 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2551 RUNNING_TIME, sink_running_time, g_free);
2552 }
2553 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2554 /* Tell the output side to start a new fragment */
2555 GST_INFO_OBJECT (splitmux,
2556 "This GOP (dur %" GST_STIME_FORMAT
2557 ") would overflow the fragment, Sending start_new_fragment cmd",
2558 GST_STIME_ARGS (queued_gop_time));
2559 cmd = out_cmd_buf_new ();
2560 cmd->start_new_fragment = TRUE;
2561 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2562 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2563
2564 splitmux->fragment_start_time = gop->start_time;
2565 splitmux->fragment_start_time_pts = gop->start_time_pts;
2566 splitmux->fragment_total_bytes = 0;
2567 splitmux->fragment_reference_bytes = 0;
2568
2569 video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2570 splitmux->next_fragment_start_tc_time =
2571 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2572 splitmux->fragment_start_time, NULL);
2573 if (splitmux->tc_interval && splitmux->fragment_start_tc
2574 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2575 GST_WARNING_OBJECT (splitmux,
2576 "Couldn't calculate next fragment start time for timecode mode");
2577 }
2578 }
2579
2580 /* And set up to collect the next GOP */
2581 if (max_out_running_time != G_MAXINT64) {
2582 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2583 } else {
2584 /* This is probably already the current state, but just in case: */
2585 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2586 }
2587
2588 /* And wake all input contexts to send a wake-up event */
2589 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2590 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2591
2592 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2593 splitmux->fragment_total_bytes += gop->total_bytes;
2594 splitmux->fragment_reference_bytes += gop->reference_bytes;
2595
2596 if (gop->total_bytes > 0) {
2597 GST_LOG_OBJECT (splitmux,
2598 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2599 " time %" GST_STIME_FORMAT,
2600 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2601
2602 /* Send this GOP to the output command queue */
2603 cmd = out_cmd_buf_new ();
2604 cmd->start_new_fragment = FALSE;
2605 cmd->max_output_ts = max_out_running_time;
2606 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2607 GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2608 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2609
2610 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2611 }
2612
2613 return;
2614
2615 error_gop_duration:
2616 #ifdef OHOS_OPT_COMPAT
2617 // ohos.opt.compat.0039
2618 continue_execution(splitmux, max_out_running_time);
2619 #endif
2620 GST_ELEMENT_ERROR (splitmux,
2621 STREAM, FAILED, ("Timestamping error on input streams"),
2622 ("Queued GOP time is negative %" GST_STIME_FORMAT,
2623 GST_STIME_ARGS (queued_gop_time)));
2624 return;
2625 error_queued_time:
2626 #ifdef OHOS_OPT_COMPAT
2627 // ohos.opt.compat.0039
2628 continue_execution(splitmux, max_out_running_time);
2629 #endif
2630 GST_ELEMENT_ERROR (splitmux,
2631 STREAM, FAILED, ("Timestamping error on input streams"),
2632 ("Queued time is negative. Input went backwards. queued_time - %"
2633 GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2634 return;
2635 }
2636
2637 /* Called with splitmux lock held */
2638 /* Called from each input pad when it is has all the pieces
2639 * for a GOP or EOS, starting with the reference pad which has set the
2640 * splitmux->max_in_running_time
2641 */
2642 static void
check_completed_gop(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)2643 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2644 {
2645 GList *cur;
2646 GstEvent *event;
2647
2648 /* On ENDING_FILE, the reference stream sends a command to start a new
2649 * fragment, then releases the GOP for output in the new fragment.
2650 * If some streams received no buffer during the last GOP that overran,
2651 * because its next buffer has a timestamp bigger than
2652 * ctx->max_in_running_time, its queue is empty. In that case the only
2653 * way to wakeup the output thread is by injecting an event in the
2654 * queue. This usually happen with subtitle streams.
2655 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2656 if (ctx->need_unblock) {
2657 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2658 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2659 GST_EVENT_TYPE_SERIALIZED,
2660 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2661 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2662
2663 GST_SPLITMUX_UNLOCK (splitmux);
2664 gst_pad_send_event (ctx->sinkpad, event);
2665 GST_SPLITMUX_LOCK (splitmux);
2666
2667 ctx->need_unblock = FALSE;
2668 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2669 /* state may have changed while we were unlocked. Loop again if so */
2670 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2671 return;
2672 }
2673
2674 do {
2675 GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2676
2677 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2678 GstClockTimeDiff max_out_running_time;
2679 gboolean ready = TRUE;
2680 InputGop *gop;
2681 const InputGop *next_gop;
2682
2683 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2684 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2685
2686 #ifdef OHOS_OPT_COMPAT
2687 /* ohos.opt.stable.0045
2688 * When the plug-in does not switch to the playing state, there is no data passing by.
2689 * At this time, it receives the eos signal, and the gop and gops are null, resulting in an assertion error.
2690 * Set input_state to COLLECTING_GOP_START, put non-reference data source into sleep and stop receiving data */
2691 if ((gop == NULL) && (next_gop == NULL)) {
2692 GST_WARNING_OBJECT (splitmux, "Received eos signal. No further GOPs finished collecting");
2693 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2694 return;
2695 }
2696 #endif
2697
2698 /* If we have no GOP or no next GOP here then the reference context is
2699 * at EOS, otherwise use the start time of the next GOP if we're far
2700 * enough in the GOP to know it */
2701 if (gop && next_gop) {
2702 if (!splitmux->reference_ctx->in_eos
2703 && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2704 && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2705 GST_LOG_OBJECT (splitmux,
2706 "No further GOPs finished collecting, waiting until current DTS %"
2707 GST_STIME_FORMAT " has passed next GOP start PTS %"
2708 GST_STIME_FORMAT,
2709 GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2710 GST_STIME_ARGS (next_gop->start_time_pts));
2711 break;
2712 }
2713
2714 GST_LOG_OBJECT (splitmux,
2715 "Finished collecting GOP with start time %" GST_STIME_FORMAT
2716 ", next GOP start time %" GST_STIME_FORMAT,
2717 GST_STIME_ARGS (gop->start_time),
2718 GST_STIME_ARGS (next_gop->start_time));
2719 next_gop_start = next_gop->start_time;
2720 max_out_running_time =
2721 splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2722 } else if (!next_gop) {
2723 GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2724 next_gop_start = splitmux->max_in_running_time;
2725 max_out_running_time = G_MAXINT64;
2726 } else if (!gop) {
2727 GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2728 break;
2729 } else {
2730 g_assert_not_reached ();
2731 }
2732
2733 g_assert (gop != NULL);
2734
2735 /* Iterate each pad, and check that the input running time is at least
2736 * up to the start running time of the next GOP or EOS, and if so handle
2737 * the collected GOP */
2738 GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2739 GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2740 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2741 cur = g_list_next (cur)) {
2742 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2743
2744 GST_LOG_OBJECT (splitmux,
2745 "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2746 " EOS %d", tmpctx, tmpctx->sinkpad,
2747 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2748
2749 if (next_gop_start != GST_CLOCK_STIME_NONE &&
2750 tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2751 #ifdef OHOS_OPT_COMPAT
2752 // ohos.opt.compat.0042
2753 // Prevent the gap between video pts and audio pts from being too large, which will lead to waiting all the time
2754 GstClockTimeDiff diff = next_gop_start - tmpctx->in_running_time;
2755 if (diff >= 2 * GST_SECOND) { // Error reported for more than 2s
2756 GST_ELEMENT_ERROR (splitmux, STREAM, FAILED, ("Timestamping error on input streams"),
2757 ("Context %p sink pad %" GST_PTR_FORMAT " running time %" GST_STIME_FORMAT
2758 ", next gop start: %" GST_STIME_FORMAT ", diff %" GST_STIME_FORMAT,
2759 tmpctx, tmpctx->sinkpad, GST_STIME_ARGS (tmpctx->in_running_time),
2760 GST_STIME_ARGS (next_gop_start), GST_STIME_ARGS (diff)));
2761 } else if (diff >= 500 * GST_MSECOND) { // Record error log for more than 500ms
2762 GST_ERROR_OBJECT (splitmux, "Context %p sink pad %" GST_PTR_FORMAT " running time %" GST_STIME_FORMAT
2763 ", next gop start: %" GST_STIME_FORMAT ", diff %" GST_STIME_FORMAT,
2764 tmpctx, tmpctx->sinkpad, GST_STIME_ARGS (tmpctx->in_running_time),
2765 GST_STIME_ARGS (next_gop_start), GST_STIME_ARGS (diff));
2766 }
2767 #endif
2768 GST_LOG_OBJECT (splitmux,
2769 "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2770 tmpctx, tmpctx->sinkpad);
2771 ready = FALSE;
2772 break;
2773 }
2774 }
2775 if (ready) {
2776 GST_DEBUG_OBJECT (splitmux,
2777 "Collected GOP is complete. Processing (ctx %p)", ctx);
2778 /* All pads have a complete GOP, release it into the multiqueue */
2779 handle_gathered_gop (splitmux, gop, next_gop_start,
2780 max_out_running_time);
2781
2782 g_queue_pop_head (&splitmux->pending_input_gops);
2783 input_gop_free (gop);
2784
2785 /* The user has requested a split, we can split now that the previous GOP
2786 * has been collected to the correct location */
2787 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2788 TRUE, FALSE)) {
2789 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2790 }
2791 }
2792 }
2793
2794 /* If upstream reached EOS we are not expecting more data, no need to wait
2795 * here. */
2796 if (ctx->in_eos)
2797 return;
2798
2799 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2800 !ctx->flushing &&
2801 ctx->in_running_time >= next_gop_start &&
2802 next_gop_start != GST_CLOCK_STIME_NONE) {
2803 /* Some pad is not yet ready, or GOP is being pushed
2804 * either way, sleep and wait to get woken */
2805 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2806 GST_SPLITMUX_WAIT_INPUT (splitmux);
2807 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2808 } else {
2809 /* This pad is not ready or the state changed - break out and get another
2810 * buffer / event */
2811 break;
2812 }
2813 } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2814 }
2815
2816 static GstPadProbeReturn
handle_mq_input(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)2817 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2818 {
2819 GstSplitMuxSink *splitmux = ctx->splitmux;
2820 GstFlowReturn ret = GST_FLOW_OK;
2821 GstBuffer *buf;
2822 MqStreamBuf *buf_info = NULL;
2823 GstClockTime ts, pts, dts;
2824 GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2825 gboolean loop_again;
2826 gboolean keyframe = FALSE;
2827
2828 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2829
2830 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2831 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2832 g_warning ("Buffer list handling not implemented");
2833 return GST_PAD_PROBE_DROP;
2834 }
2835 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2836 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2837 GstEvent *event = gst_pad_probe_info_get_event (info);
2838
2839 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2840
2841 switch (GST_EVENT_TYPE (event)) {
2842 case GST_EVENT_SEGMENT:
2843 gst_event_copy_segment (event, &ctx->in_segment);
2844 break;
2845 case GST_EVENT_FLUSH_STOP:
2846 GST_SPLITMUX_LOCK (splitmux);
2847 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2848 ctx->in_eos = FALSE;
2849 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2850 GST_SPLITMUX_UNLOCK (splitmux);
2851 break;
2852 case GST_EVENT_EOS:
2853 GST_SPLITMUX_LOCK (splitmux);
2854 ctx->in_eos = TRUE;
2855
2856 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2857 ret = GST_FLOW_FLUSHING;
2858 goto beach;
2859 }
2860
2861 if (ctx->is_reference) {
2862 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2863 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2864 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2865 /* Wake up other input pads to collect this GOP */
2866 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2867 check_completed_gop (splitmux, ctx);
2868 } else if (splitmux->input_state ==
2869 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2870 /* If we are waiting for a GOP to be completed (ie, for aux
2871 * pads to catch up), then this pad is complete, so check
2872 * if the whole GOP is.
2873 */
2874 check_completed_gop (splitmux, ctx);
2875 }
2876 GST_SPLITMUX_UNLOCK (splitmux);
2877 break;
2878 case GST_EVENT_GAP:{
2879 GstClockTime gap_ts;
2880 GstClockTimeDiff rtime;
2881
2882 gst_event_parse_gap (event, &gap_ts, NULL);
2883 if (gap_ts == GST_CLOCK_TIME_NONE)
2884 break;
2885
2886 GST_SPLITMUX_LOCK (splitmux);
2887
2888 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2889 ret = GST_FLOW_FLUSHING;
2890 goto beach;
2891 }
2892 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2893
2894 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2895 GST_STIME_ARGS (rtime));
2896
2897 if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2898 /* If this GAP event happens before the first fragment then
2899 * initialize the fragment start time here. */
2900 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2901 splitmux->fragment_start_time = rtime;
2902 GST_LOG_OBJECT (splitmux,
2903 "Fragment start time now %" GST_STIME_FORMAT,
2904 GST_STIME_ARGS (splitmux->fragment_start_time));
2905
2906 /* Also take this as the first start time when starting up,
2907 * so that we start counting overflow from the first frame */
2908 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2909 splitmux->max_in_running_time = rtime;
2910 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2911 splitmux->max_in_running_time_dts = rtime;
2912 }
2913
2914 /* Similarly take it as fragment start PTS and GOP start time if
2915 * these are not set */
2916 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2917 splitmux->fragment_start_time_pts = rtime;
2918
2919 if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2920 InputGop *gop = g_slice_new0 (InputGop);
2921
2922 gop->from_gap = TRUE;
2923 gop->start_time = rtime;
2924 gop->start_time_pts = rtime;
2925
2926 g_queue_push_tail (&splitmux->pending_input_gops, gop);
2927 }
2928 }
2929
2930 GST_SPLITMUX_UNLOCK (splitmux);
2931 break;
2932 }
2933 default:
2934 break;
2935 }
2936 return GST_PAD_PROBE_PASS;
2937 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2938 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2939 case GST_QUERY_ALLOCATION:
2940 return GST_PAD_PROBE_DROP;
2941 default:
2942 return GST_PAD_PROBE_PASS;
2943 }
2944 }
2945
2946 buf = gst_pad_probe_info_get_buffer (info);
2947 buf_info = mq_stream_buf_new ();
2948
2949 pts = GST_BUFFER_PTS (buf);
2950 dts = GST_BUFFER_DTS (buf);
2951 if (GST_BUFFER_PTS_IS_VALID (buf))
2952 ts = GST_BUFFER_PTS (buf);
2953 else
2954 ts = GST_BUFFER_DTS (buf);
2955
2956 GST_LOG_OBJECT (pad,
2957 "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2958 GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2959 GST_TIME_ARGS (dts));
2960
2961 #ifdef OHOS_OPT_COMPAT
2962 // ohos.opt.compat.0018
2963 // to avoid when buffer take too many time in src,
2964 // cause pst comes to clock none.
2965 GST_BUFFER_PTS (buf) = ts;
2966 GST_BUFFER_DTS (buf) = ts;
2967 #endif
2968
2969 GST_SPLITMUX_LOCK (splitmux);
2970
2971 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2972 ret = GST_FLOW_FLUSHING;
2973 goto beach;
2974 }
2975
2976 /* If this buffer has a timestamp, advance the input timestamp of the
2977 * stream */
2978 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2979 running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2980
2981 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2982 GST_STIME_ARGS (running_time));
2983
2984 /* in running time is always the maximum PTS (or DTS) that was observed so far */
2985 if (GST_CLOCK_STIME_IS_VALID (running_time)
2986 && running_time > ctx->in_running_time)
2987 ctx->in_running_time = running_time;
2988 } else {
2989 running_time = ctx->in_running_time;
2990 }
2991
2992 if (GST_CLOCK_TIME_IS_VALID (pts))
2993 running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2994 else
2995 running_time_pts = GST_CLOCK_STIME_NONE;
2996
2997 if (GST_CLOCK_TIME_IS_VALID (dts))
2998 running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2999 else
3000 running_time_dts = GST_CLOCK_STIME_NONE;
3001
3002 /* Try to make sure we have a valid running time */
3003 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
3004 ctx->in_running_time =
3005 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
3006 }
3007
3008 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
3009 GST_STIME_ARGS (ctx->in_running_time));
3010
3011 buf_info->run_ts = ctx->in_running_time;
3012 buf_info->buf_size = gst_buffer_get_size (buf);
3013 buf_info->duration = GST_BUFFER_DURATION (buf);
3014
3015 if (ctx->is_reference) {
3016 InputGop *gop = NULL;
3017 GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
3018
3019 /* initialize fragment_start_time if it was not set yet (i.e. for the
3020 * first fragment), or otherwise set it to the minimum observed time */
3021 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
3022 || splitmux->fragment_start_time > running_time) {
3023 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
3024 splitmux->fragment_start_time_pts = running_time_pts;
3025 splitmux->fragment_start_time = running_time;
3026
3027 GST_LOG_OBJECT (splitmux,
3028 "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
3029 GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
3030 GST_STIME_ARGS (splitmux->fragment_start_time_pts));
3031
3032 /* Also take this as the first start time when starting up,
3033 * so that we start counting overflow from the first frame */
3034 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
3035 || splitmux->max_in_running_time < splitmux->fragment_start_time)
3036 splitmux->max_in_running_time = splitmux->fragment_start_time;
3037
3038 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
3039 splitmux->max_in_running_time_dts = running_time_dts;
3040
3041 if (tc_meta) {
3042 video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
3043
3044 splitmux->next_fragment_start_tc_time =
3045 calculate_next_max_timecode (splitmux, &tc_meta->tc,
3046 running_time, NULL);
3047 if (splitmux->tc_interval
3048 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
3049 {
3050 GST_WARNING_OBJECT (splitmux,
3051 "Couldn't calculate next fragment start time for timecode mode");
3052 }
3053 #ifndef GST_DISABLE_GST_DEBUG
3054 {
3055 gchar *tc_str;
3056
3057 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3058 GST_DEBUG_OBJECT (splitmux,
3059 "Initialize fragment start timecode %s, next fragment start timecode time %"
3060 GST_TIME_FORMAT, tc_str,
3061 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
3062 g_free (tc_str);
3063 }
3064 #endif
3065 }
3066 }
3067
3068
3069 /* First check if we're at the very first GOP and the tracking was created
3070 * from a GAP event. In that case don't start a new GOP on keyframes but
3071 * just updated it as needed */
3072 gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3073
3074 if (!gop || (!gop->from_gap
3075 && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3076 gop = g_slice_new0 (InputGop);
3077
3078 gop->start_time = running_time;
3079 gop->start_time_pts = running_time_pts;
3080
3081 GST_LOG_OBJECT (splitmux,
3082 "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3083 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3084 GST_STIME_ARGS (gop->start_time_pts));
3085
3086 if (tc_meta) {
3087 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3088
3089 #ifndef GST_DISABLE_GST_DEBUG
3090 {
3091 gchar *tc_str;
3092
3093 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3094 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3095 g_free (tc_str);
3096 }
3097 #endif
3098 }
3099
3100 g_queue_push_tail (&splitmux->pending_input_gops, gop);
3101 } else {
3102 gop->from_gap = FALSE;
3103
3104 if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3105 || gop->start_time > running_time) {
3106 gop->start_time = running_time;
3107
3108 GST_LOG_OBJECT (splitmux,
3109 "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3110 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3111 GST_STIME_ARGS (gop->start_time_pts));
3112
3113 if (tc_meta) {
3114 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3115
3116 #ifndef GST_DISABLE_GST_DEBUG
3117 {
3118 gchar *tc_str;
3119
3120 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3121 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3122 tc_str);
3123 g_free (tc_str);
3124 }
3125 #endif
3126 }
3127 }
3128 }
3129
3130 /* Check whether we need to request next keyframe depending on
3131 * current running time */
3132 if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3133 GST_WARNING_OBJECT (splitmux,
3134 "Could not request a keyframe. Files may not split at the exact location they should");
3135 }
3136 }
3137
3138 {
3139 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3140
3141 if (gop) {
3142 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3143 " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3144 G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3145 gop->total_bytes, gop->total_bytes);
3146 }
3147 }
3148
3149 loop_again = TRUE;
3150 do {
3151 if (ctx->flushing)
3152 break;
3153
3154 switch (splitmux->input_state) {
3155 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3156 if (ctx->is_releasing) {
3157 /* The pad belonging to this context is being released */
3158 GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
3159 "running. Data might not drain correctly");
3160 loop_again = FALSE;
3161 } else if (ctx->is_reference) {
3162 const InputGop *gop, *next_gop;
3163
3164 /* This is the reference context. If it's a keyframe,
3165 * it marks the start of a new GOP and we should wait in
3166 * check_completed_gop before continuing, but either way
3167 * (keyframe or no, we'll pass this buffer through after
3168 * so set loop_again to FALSE */
3169 loop_again = FALSE;
3170
3171 gop = g_queue_peek_head (&splitmux->pending_input_gops);
3172 g_assert (gop != NULL);
3173 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3174
3175 if (ctx->in_running_time > splitmux->max_in_running_time)
3176 splitmux->max_in_running_time = ctx->in_running_time;
3177 if (running_time_dts > splitmux->max_in_running_time_dts)
3178 splitmux->max_in_running_time_dts = running_time_dts;
3179
3180 GST_LOG_OBJECT (splitmux,
3181 "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3182 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3183 GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3184
3185 if (!next_gop) {
3186 GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3187 /* Allow other input pads to catch up to here too */
3188 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3189 break;
3190 }
3191
3192 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3193 GST_INFO_OBJECT (pad,
3194 "Have keyframe with running time %" GST_STIME_FORMAT,
3195 GST_STIME_ARGS (ctx->in_running_time));
3196 keyframe = TRUE;
3197 }
3198
3199 if (running_time_dts != GST_CLOCK_STIME_NONE
3200 && running_time_dts < next_gop->start_time_pts) {
3201 GST_DEBUG_OBJECT (splitmux,
3202 "Waiting until DTS (%" GST_STIME_FORMAT
3203 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3204 GST_STIME_ARGS (running_time_dts),
3205 GST_STIME_ARGS (next_gop->start_time_pts));
3206 /* Allow other input pads to catch up to here too */
3207 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3208 break;
3209 }
3210
3211 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3212 /* Wake up other input pads to collect this GOP */
3213 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3214 check_completed_gop (splitmux, ctx);
3215 } else {
3216 /* Pass this buffer if the reference ctx is far enough ahead */
3217 if (ctx->in_running_time < splitmux->max_in_running_time) {
3218 loop_again = FALSE;
3219 break;
3220 }
3221
3222 /* We're still waiting for a keyframe on the reference pad, sleep */
3223 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3224 GST_SPLITMUX_WAIT_INPUT (splitmux);
3225 GST_LOG_OBJECT (pad,
3226 "Done sleeping for GOP start input state now %d",
3227 splitmux->input_state);
3228 }
3229 break;
3230 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3231 /* We're collecting a GOP, this is only ever called for non-reference
3232 * contexts as the reference context would be waiting inside
3233 * check_completed_gop() */
3234
3235 g_assert (!ctx->is_reference);
3236
3237 /* If we overran the target timestamp, it might be time to process
3238 * the GOP, otherwise bail out for more data. */
3239 GST_LOG_OBJECT (pad,
3240 "Checking TS %" GST_STIME_FORMAT " against max %"
3241 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3242 GST_STIME_ARGS (splitmux->max_in_running_time));
3243
3244 if (ctx->in_running_time < splitmux->max_in_running_time) {
3245 loop_again = FALSE;
3246 break;
3247 }
3248
3249 GST_LOG_OBJECT (pad,
3250 "Collected last packet of GOP. Checking other pads");
3251 check_completed_gop (splitmux, ctx);
3252 break;
3253 }
3254 case SPLITMUX_INPUT_STATE_FINISHING_UP:
3255 loop_again = FALSE;
3256 break;
3257 default:
3258 loop_again = FALSE;
3259 break;
3260 }
3261 }
3262 while (loop_again);
3263
3264 if (keyframe && ctx->is_reference)
3265 splitmux->queued_keyframes++;
3266 buf_info->keyframe = keyframe;
3267
3268 /* Update total input byte counter for overflow detect unless we're after
3269 * EOS now */
3270 if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3271 && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3272 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3273
3274 /* We must have a GOP at this point */
3275 g_assert (gop != NULL);
3276
3277 gop->total_bytes += buf_info->buf_size;
3278 if (ctx->is_reference) {
3279 gop->reference_bytes += buf_info->buf_size;
3280 }
3281 }
3282
3283 /* Now add this buffer to the queue just before returning */
3284 g_queue_push_head (&ctx->queued_bufs, buf_info);
3285
3286 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3287 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3288
3289 GST_SPLITMUX_UNLOCK (splitmux);
3290 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3291 return GST_PAD_PROBE_PASS;
3292
3293 beach:
3294 GST_SPLITMUX_UNLOCK (splitmux);
3295 if (buf_info)
3296 mq_stream_buf_free (buf_info);
3297 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3298 return GST_PAD_PROBE_PASS;
3299 }
3300
3301 static void
grow_blocked_queues(GstSplitMuxSink * splitmux)3302 grow_blocked_queues (GstSplitMuxSink * splitmux)
3303 {
3304 GList *cur;
3305
3306 /* Scan other queues for full-ness and grow them */
3307 for (cur = g_list_first (splitmux->contexts);
3308 cur != NULL; cur = g_list_next (cur)) {
3309 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3310 guint cur_limit;
3311 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3312
3313 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3314 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3315
3316 if (cur_len >= cur_limit) {
3317 cur_limit = cur_len + 1;
3318 GST_DEBUG_OBJECT (tmpctx->q,
3319 "Queue overflowed and needs enlarging. Growing to %u buffers",
3320 cur_limit);
3321 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3322 }
3323 }
3324 }
3325
3326 static void
handle_q_underrun(GstElement * q,gpointer user_data)3327 handle_q_underrun (GstElement * q, gpointer user_data)
3328 {
3329 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3330 GstSplitMuxSink *splitmux = ctx->splitmux;
3331
3332 GST_SPLITMUX_LOCK (splitmux);
3333 GST_DEBUG_OBJECT (q,
3334 "Queue reported underrun with %d keyframes and %d cmds enqueued",
3335 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3336 grow_blocked_queues (splitmux);
3337 GST_SPLITMUX_UNLOCK (splitmux);
3338 }
3339
3340 static void
handle_q_overrun(GstElement * q,gpointer user_data)3341 handle_q_overrun (GstElement * q, gpointer user_data)
3342 {
3343 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3344 GstSplitMuxSink *splitmux = ctx->splitmux;
3345 gboolean allow_grow = FALSE;
3346
3347 GST_SPLITMUX_LOCK (splitmux);
3348 GST_DEBUG_OBJECT (q,
3349 "Queue reported overrun with %d keyframes and %d cmds enqueued",
3350 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3351
3352 if (splitmux->queued_keyframes < 2) {
3353 /* Less than a full GOP queued, grow the queue */
3354 allow_grow = TRUE;
3355 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3356 allow_grow = TRUE;
3357 } else {
3358 /* If another queue is starved, grow */
3359 GList *cur;
3360 for (cur = g_list_first (splitmux->contexts);
3361 cur != NULL; cur = g_list_next (cur)) {
3362 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3363 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3364 allow_grow = TRUE;
3365 }
3366 }
3367 }
3368 GST_SPLITMUX_UNLOCK (splitmux);
3369
3370 if (allow_grow) {
3371 guint cur_limit;
3372
3373 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3374 cur_limit++;
3375
3376 GST_DEBUG_OBJECT (q,
3377 "Queue overflowed and needs enlarging. Growing to %u buffers",
3378 cur_limit);
3379
3380 g_object_set (q, "max-size-buffers", cur_limit, NULL);
3381 }
3382 }
3383
3384 /* Called with SPLITMUX lock held */
3385 static const gchar *
lookup_muxer_pad(GstSplitMuxSink * splitmux,const gchar * sinkpad_name)3386 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3387 {
3388 const gchar *ret = NULL;
3389
3390 if (splitmux->muxerpad_map == NULL)
3391 return NULL;
3392
3393 if (sinkpad_name == NULL) {
3394 GST_WARNING_OBJECT (splitmux,
3395 "Can't look up request pad in pad map without providing a pad name");
3396 return NULL;
3397 }
3398
3399 ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3400 if (ret) {
3401 GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3402 ret);
3403 return g_strdup (ret);
3404 }
3405
3406 return NULL;
3407 }
3408
3409 static GstPad *
gst_splitmux_sink_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)3410 gst_splitmux_sink_request_new_pad (GstElement * element,
3411 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3412 {
3413 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3414 GstPadTemplate *mux_template = NULL;
3415 GstPad *ret = NULL, *muxpad = NULL;
3416 GstElement *q;
3417 GstPad *q_sink = NULL, *q_src = NULL;
3418 gchar *gname, *qname;
3419 gboolean is_primary_video = FALSE, is_video = FALSE,
3420 muxer_is_requestpad = FALSE;
3421 MqStreamCtx *ctx;
3422 const gchar *muxer_padname = NULL;
3423
3424 GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3425
3426 GST_SPLITMUX_LOCK (splitmux);
3427 if (!create_muxer (splitmux))
3428 goto fail;
3429 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3430
3431 if (g_str_equal (templ->name_template, "video") ||
3432 g_str_has_prefix (templ->name_template, "video_aux_")) {
3433 is_primary_video = g_str_equal (templ->name_template, "video");
3434 if (is_primary_video && splitmux->have_video)
3435 goto already_have_video;
3436 is_video = TRUE;
3437 }
3438
3439 /* See if there's a pad map and it lists this pad */
3440 muxer_padname = lookup_muxer_pad (splitmux, name);
3441
3442 if (muxer_padname == NULL) {
3443 if (is_video) {
3444 /* FIXME: Look for a pad template with matching caps, rather than by name */
3445 GST_DEBUG_OBJECT (element,
3446 "searching for pad-template with name 'video_%%u'");
3447 mux_template =
3448 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3449 (splitmux->muxer), "video_%u");
3450
3451 /* Fallback to find sink pad templates named 'video' (flvmux) */
3452 if (!mux_template) {
3453 GST_DEBUG_OBJECT (element,
3454 "searching for pad-template with name 'video'");
3455 mux_template =
3456 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3457 (splitmux->muxer), "video");
3458 }
3459 name = NULL;
3460 } else {
3461 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3462 templ->name_template);
3463 mux_template =
3464 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3465 (splitmux->muxer), templ->name_template);
3466
3467 /* Fallback to find sink pad templates named 'audio' (flvmux) */
3468 if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3469 GST_DEBUG_OBJECT (element,
3470 "searching for pad-template with name 'audio'");
3471 mux_template =
3472 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3473 (splitmux->muxer), "audio");
3474 name = NULL;
3475 }
3476 }
3477
3478 if (mux_template == NULL) {
3479 GST_DEBUG_OBJECT (element,
3480 "searching for pad-template with name 'sink_%%d'");
3481 mux_template =
3482 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3483 (splitmux->muxer), "sink_%d");
3484 name = NULL;
3485 }
3486 if (mux_template == NULL) {
3487 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3488 mux_template =
3489 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3490 (splitmux->muxer), "sink");
3491 name = NULL;
3492 }
3493
3494 if (mux_template == NULL) {
3495 GST_ERROR_OBJECT (element,
3496 "unable to find a suitable sink pad-template on the muxer");
3497 goto fail;
3498 }
3499 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3500 mux_template->name_template);
3501
3502 if (mux_template->presence == GST_PAD_REQUEST) {
3503 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3504
3505 muxpad =
3506 gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3507 muxer_is_requestpad = TRUE;
3508 } else if (mux_template->presence == GST_PAD_ALWAYS) {
3509 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3510
3511 muxpad =
3512 gst_element_get_static_pad (splitmux->muxer,
3513 mux_template->name_template);
3514 } else {
3515 GST_ERROR_OBJECT (element,
3516 "unexpected pad presence %d", mux_template->presence);
3517 goto fail;
3518 }
3519 } else {
3520 /* Have a muxer pad name */
3521 if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3522 if ((muxpad =
3523 gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3524 muxer_is_requestpad = TRUE;
3525 }
3526 g_free ((gchar *) muxer_padname);
3527 muxer_padname = NULL;
3528 }
3529
3530 /* One way or another, we must have a muxer pad by now */
3531 if (muxpad == NULL)
3532 goto fail;
3533
3534 if (is_primary_video)
3535 gname = g_strdup ("video");
3536 else if (name == NULL)
3537 gname = gst_pad_get_name (muxpad);
3538 else
3539 gname = g_strdup (name);
3540
3541 qname = g_strdup_printf ("queue_%s", gname);
3542 if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3543 g_free (qname);
3544 goto fail;
3545 }
3546 g_free (qname);
3547
3548 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3549
3550 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3551 "max-size-buffers", 5, NULL);
3552
3553 q_sink = gst_element_get_static_pad (q, "sink");
3554 q_src = gst_element_get_static_pad (q, "src");
3555
3556 if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3557 if (muxer_is_requestpad)
3558 gst_element_release_request_pad (splitmux->muxer, muxpad);
3559 gst_object_unref (GST_OBJECT (muxpad));
3560 goto fail;
3561 }
3562
3563 gst_object_unref (GST_OBJECT (muxpad));
3564
3565 ctx = mq_stream_ctx_new (splitmux);
3566 /* Context holds a ref: */
3567 ctx->q = gst_object_ref (q);
3568 ctx->srcpad = q_src;
3569 ctx->sinkpad = q_sink;
3570 ctx->q_overrun_id =
3571 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3572 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3573
3574 ctx->src_pad_block_id =
3575 gst_pad_add_probe (q_src,
3576 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3577 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3578 if (is_primary_video && splitmux->reference_ctx != NULL) {
3579 splitmux->reference_ctx->is_reference = FALSE;
3580 splitmux->reference_ctx = NULL;
3581 }
3582 if (splitmux->reference_ctx == NULL) {
3583 splitmux->reference_ctx = ctx;
3584 ctx->is_reference = TRUE;
3585 }
3586
3587 ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3588 g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3589
3590 ctx->sink_pad_block_id =
3591 gst_pad_add_probe (q_sink,
3592 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3593 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3594 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3595
3596 GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3597 " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3598
3599 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3600
3601 g_free (gname);
3602
3603 if (is_primary_video)
3604 splitmux->have_video = TRUE;
3605
3606 gst_pad_set_active (ret, TRUE);
3607 gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3608
3609 GST_SPLITMUX_UNLOCK (splitmux);
3610
3611 return ret;
3612 fail:
3613 GST_SPLITMUX_UNLOCK (splitmux);
3614
3615 if (q_sink)
3616 gst_object_unref (q_sink);
3617 if (q_src)
3618 gst_object_unref (q_src);
3619 return NULL;
3620 already_have_video:
3621 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3622 GST_SPLITMUX_UNLOCK (splitmux);
3623 return NULL;
3624 }
3625
3626 static void
gst_splitmux_sink_release_pad(GstElement * element,GstPad * pad)3627 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3628 {
3629 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3630 GstPad *muxpad = NULL;
3631 MqStreamCtx *ctx =
3632 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3633
3634 GST_SPLITMUX_LOCK (splitmux);
3635
3636 if (splitmux->muxer == NULL)
3637 goto fail; /* Elements don't exist yet - nothing to release */
3638
3639 GST_INFO_OBJECT (pad, "releasing request pad");
3640
3641 muxpad = gst_pad_get_peer (ctx->srcpad);
3642
3643 /* Remove the context from our consideration */
3644 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3645
3646 GST_SPLITMUX_UNLOCK (splitmux);
3647
3648 if (ctx->sink_pad_block_id) {
3649 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3650 gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3651 }
3652
3653 if (ctx->src_pad_block_id)
3654 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3655
3656 GST_SPLITMUX_LOCK (splitmux);
3657
3658 ctx->is_releasing = TRUE;
3659 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3660
3661 /* Can release the context now */
3662 mq_stream_ctx_free (ctx);
3663 if (ctx == splitmux->reference_ctx)
3664 splitmux->reference_ctx = NULL;
3665
3666 /* Release and free the muxer input */
3667 if (muxpad) {
3668 gst_element_release_request_pad (splitmux->muxer, muxpad);
3669 gst_object_unref (muxpad);
3670 }
3671
3672 if (GST_PAD_PAD_TEMPLATE (pad) &&
3673 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3674 (pad)), "video"))
3675 splitmux->have_video = FALSE;
3676
3677 gst_element_remove_pad (element, pad);
3678
3679 /* Reset the internal elements only after all request pads are released */
3680 if (splitmux->contexts == NULL)
3681 gst_splitmux_reset_elements (splitmux);
3682
3683 /* Wake up other input streams to check if the completion conditions have
3684 * changed */
3685 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3686
3687 fail:
3688 GST_SPLITMUX_UNLOCK (splitmux);
3689 }
3690
3691 static GstElement *
create_element(GstSplitMuxSink * splitmux,const gchar * factory,const gchar * name,gboolean locked)3692 create_element (GstSplitMuxSink * splitmux,
3693 const gchar * factory, const gchar * name, gboolean locked)
3694 {
3695 GstElement *ret = gst_element_factory_make (factory, name);
3696 if (ret == NULL) {
3697 g_warning ("Failed to create %s - splitmuxsink will not work", name);
3698 return NULL;
3699 }
3700
3701 if (locked) {
3702 /* Ensure the sink starts in locked state and NULL - it will be changed
3703 * by the filename setting code */
3704 gst_element_set_locked_state (ret, TRUE);
3705 gst_element_set_state (ret, GST_STATE_NULL);
3706 }
3707
3708 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3709 g_warning ("Could not add %s element - splitmuxsink will not work", name);
3710 gst_object_unref (ret);
3711 return NULL;
3712 }
3713
3714 return ret;
3715 }
3716
3717 static gboolean
create_muxer(GstSplitMuxSink * splitmux)3718 create_muxer (GstSplitMuxSink * splitmux)
3719 {
3720 /* Create internal elements */
3721 if (splitmux->muxer == NULL) {
3722 GstElement *provided_muxer = NULL;
3723
3724 GST_OBJECT_LOCK (splitmux);
3725 if (splitmux->provided_muxer != NULL)
3726 provided_muxer = gst_object_ref (splitmux->provided_muxer);
3727 GST_OBJECT_UNLOCK (splitmux);
3728
3729 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3730 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3731 if ((splitmux->muxer =
3732 create_element (splitmux,
3733 splitmux->muxer_factory ? splitmux->
3734 muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3735 goto fail;
3736 } else if (splitmux->async_finalize) {
3737 if ((splitmux->muxer =
3738 create_element (splitmux, splitmux->muxer_factory, "muxer",
3739 FALSE)) == NULL)
3740 goto fail;
3741 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3742 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3743 splitmux->muxer_preset);
3744 if (splitmux->muxer_properties)
3745 gst_structure_foreach (splitmux->muxer_properties,
3746 _set_property_from_structure, splitmux->muxer);
3747 } else {
3748 /* Ensure it's not in locked state (we might be reusing an old element) */
3749 gst_element_set_locked_state (provided_muxer, FALSE);
3750 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3751 g_warning ("Could not add muxer element - splitmuxsink will not work");
3752 gst_object_unref (provided_muxer);
3753 goto fail;
3754 }
3755
3756 splitmux->muxer = provided_muxer;
3757 gst_object_unref (provided_muxer);
3758 }
3759
3760 if (splitmux->use_robust_muxing) {
3761 update_muxer_properties (splitmux);
3762 }
3763 }
3764
3765 return TRUE;
3766 fail:
3767 return FALSE;
3768 }
3769
3770 static GstElement *
find_sink(GstElement * e)3771 find_sink (GstElement * e)
3772 {
3773 GstElement *res = NULL;
3774 GstIterator *iter;
3775 gboolean done = FALSE;
3776 GValue data = { 0, };
3777
3778 if (!GST_IS_BIN (e))
3779 return e;
3780
3781 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3782 return e;
3783
3784 iter = gst_bin_iterate_sinks (GST_BIN (e));
3785 while (!done) {
3786 switch (gst_iterator_next (iter, &data)) {
3787 case GST_ITERATOR_OK:
3788 {
3789 GstElement *child = g_value_get_object (&data);
3790 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3791 "location") != NULL) {
3792 res = child;
3793 done = TRUE;
3794 }
3795 g_value_reset (&data);
3796 break;
3797 }
3798 case GST_ITERATOR_RESYNC:
3799 gst_iterator_resync (iter);
3800 break;
3801 case GST_ITERATOR_DONE:
3802 done = TRUE;
3803 break;
3804 case GST_ITERATOR_ERROR:
3805 g_assert_not_reached ();
3806 break;
3807 }
3808 }
3809 g_value_unset (&data);
3810 gst_iterator_free (iter);
3811
3812 return res;
3813 }
3814
3815 static gboolean
create_sink(GstSplitMuxSink * splitmux)3816 create_sink (GstSplitMuxSink * splitmux)
3817 {
3818 GstElement *provided_sink = NULL;
3819
3820 if (splitmux->active_sink == NULL) {
3821
3822 GST_OBJECT_LOCK (splitmux);
3823 if (splitmux->provided_sink != NULL)
3824 provided_sink = gst_object_ref (splitmux->provided_sink);
3825 GST_OBJECT_UNLOCK (splitmux);
3826
3827 if ((!splitmux->async_finalize && provided_sink == NULL) ||
3828 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3829 if ((splitmux->sink =
3830 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3831 goto fail;
3832 splitmux->active_sink = splitmux->sink;
3833 } else if (splitmux->async_finalize) {
3834 if ((splitmux->sink =
3835 create_element (splitmux, splitmux->sink_factory, "sink",
3836 TRUE)) == NULL)
3837 goto fail;
3838 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3839 gst_preset_load_preset (GST_PRESET (splitmux->sink),
3840 splitmux->sink_preset);
3841 if (splitmux->sink_properties)
3842 gst_structure_foreach (splitmux->sink_properties,
3843 _set_property_from_structure, splitmux->sink);
3844 splitmux->active_sink = splitmux->sink;
3845 } else {
3846 /* Ensure the sink starts in locked state and NULL - it will be changed
3847 * by the filename setting code */
3848 gst_element_set_locked_state (provided_sink, TRUE);
3849 gst_element_set_state (provided_sink, GST_STATE_NULL);
3850 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3851 g_warning ("Could not add sink elements - splitmuxsink will not work");
3852 gst_object_unref (provided_sink);
3853 goto fail;
3854 }
3855
3856 splitmux->active_sink = provided_sink;
3857
3858 /* The bin holds a ref now, we can drop our tmp ref */
3859 gst_object_unref (provided_sink);
3860
3861 /* Find the sink element */
3862 splitmux->sink = find_sink (splitmux->active_sink);
3863 if (splitmux->sink == NULL) {
3864 g_warning
3865 ("Could not locate sink element in provided sink - splitmuxsink will not work");
3866 goto fail;
3867 }
3868 }
3869
3870 #if 1
3871 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3872 "async") != NULL) {
3873 /* async child elements are causing state change races and weird
3874 * failures, so let's try and turn that off */
3875 g_object_set (splitmux->sink, "async", FALSE, NULL);
3876 }
3877 #endif
3878
3879 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3880 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3881 goto fail;
3882 }
3883 }
3884
3885 return TRUE;
3886 fail:
3887 return FALSE;
3888 }
3889
3890 #ifdef __GNUC__
3891 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3892 #endif
3893 static void
set_next_filename(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)3894 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3895 {
3896 gchar *fname = NULL;
3897 GstSample *sample;
3898 GstCaps *caps;
3899
3900 gst_splitmux_sink_ensure_max_files (splitmux);
3901
3902 if (ctx->cur_out_buffer == NULL) {
3903 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3904 }
3905
3906 caps = gst_pad_get_current_caps (ctx->srcpad);
3907 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3908 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3909 splitmux->fragment_id, sample, &fname);
3910 gst_sample_unref (sample);
3911 if (caps)
3912 gst_caps_unref (caps);
3913
3914 if (fname == NULL) {
3915 /* Fallback to the old signal if the new one returned nothing */
3916 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3917 splitmux->fragment_id, &fname);
3918 }
3919
3920 if (!fname)
3921 fname = splitmux->location ?
3922 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3923
3924 if (fname) {
3925 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3926 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3927 "location") != NULL)
3928 g_object_set (splitmux->sink, "location", fname, NULL);
3929 g_free (fname);
3930 }
3931
3932 splitmux->fragment_id++;
3933 }
3934
3935 /* called with GST_SPLITMUX_LOCK */
3936 static void
do_async_start(GstSplitMuxSink * splitmux)3937 do_async_start (GstSplitMuxSink * splitmux)
3938 {
3939 GstMessage *message;
3940
3941 if (!splitmux->need_async_start) {
3942 GST_INFO_OBJECT (splitmux, "no async_start needed");
3943 return;
3944 }
3945
3946 splitmux->async_pending = TRUE;
3947
3948 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3949 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3950
3951 GST_SPLITMUX_UNLOCK (splitmux);
3952 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3953 (splitmux), message);
3954 GST_SPLITMUX_LOCK (splitmux);
3955 }
3956
3957 /* called with GST_SPLITMUX_LOCK */
3958 static void
do_async_done(GstSplitMuxSink * splitmux)3959 do_async_done (GstSplitMuxSink * splitmux)
3960 {
3961 GstMessage *message;
3962
3963 if (splitmux->async_pending) {
3964 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3965 splitmux->async_pending = FALSE;
3966 GST_SPLITMUX_UNLOCK (splitmux);
3967
3968 message =
3969 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3970 GST_CLOCK_TIME_NONE);
3971 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3972 (splitmux), message);
3973 GST_SPLITMUX_LOCK (splitmux);
3974 }
3975
3976 splitmux->need_async_start = FALSE;
3977 }
3978
3979 static void
gst_splitmux_sink_reset(GstSplitMuxSink * splitmux)3980 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3981 {
3982 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3983 splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3984
3985 splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3986 splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3987 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3988
3989 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3990 g_queue_clear (&splitmux->pending_input_gops);
3991
3992 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3993 splitmux->fragment_total_bytes = 0;
3994 splitmux->fragment_reference_bytes = 0;
3995 splitmux->muxed_out_bytes = 0;
3996 splitmux->ready_for_output = FALSE;
3997
3998 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3999 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
4000
4001 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
4002 gst_queue_array_clear (splitmux->times_to_split);
4003
4004 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
4005 splitmux->queued_keyframes = 0;
4006
4007 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
4008 g_queue_clear (&splitmux->out_cmd_q);
4009 }
4010
4011 static GstStateChangeReturn
gst_splitmux_sink_change_state(GstElement * element,GstStateChange transition)4012 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
4013 {
4014 GstStateChangeReturn ret;
4015 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
4016
4017 switch (transition) {
4018 case GST_STATE_CHANGE_NULL_TO_READY:{
4019 GST_SPLITMUX_LOCK (splitmux);
4020 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
4021 ret = GST_STATE_CHANGE_FAILURE;
4022 GST_SPLITMUX_UNLOCK (splitmux);
4023 goto beach;
4024 }
4025 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
4026 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
4027 GST_SPLITMUX_UNLOCK (splitmux);
4028 splitmux->fragment_id = splitmux->start_index;
4029 break;
4030 }
4031 case GST_STATE_CHANGE_READY_TO_PAUSED:{
4032 GST_SPLITMUX_LOCK (splitmux);
4033 /* Make sure contexts and tracking times are cleared, in case we're being reused */
4034 gst_splitmux_sink_reset (splitmux);
4035 /* Start by collecting one input on each pad */
4036 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
4037 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
4038
4039 GST_SPLITMUX_UNLOCK (splitmux);
4040
4041 GST_SPLITMUX_STATE_LOCK (splitmux);
4042 splitmux->shutdown = FALSE;
4043 GST_SPLITMUX_STATE_UNLOCK (splitmux);
4044 break;
4045 }
4046 case GST_STATE_CHANGE_PAUSED_TO_READY:
4047 case GST_STATE_CHANGE_READY_TO_READY:
4048 g_atomic_int_set (&(splitmux->split_requested), FALSE);
4049 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
4050 /* Fall through */
4051 case GST_STATE_CHANGE_READY_TO_NULL:
4052 GST_SPLITMUX_STATE_LOCK (splitmux);
4053 splitmux->shutdown = TRUE;
4054 GST_SPLITMUX_STATE_UNLOCK (splitmux);
4055
4056 GST_SPLITMUX_LOCK (splitmux);
4057 gst_splitmux_sink_reset (splitmux);
4058 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
4059 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
4060 /* Wake up any blocked threads */
4061 GST_LOG_OBJECT (splitmux,
4062 "State change -> NULL or READY. Waking threads");
4063 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
4064 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
4065 GST_SPLITMUX_UNLOCK (splitmux);
4066 break;
4067 default:
4068 break;
4069 }
4070
4071 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4072 if (ret == GST_STATE_CHANGE_FAILURE)
4073 goto beach;
4074
4075 switch (transition) {
4076 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4077 splitmux->need_async_start = TRUE;
4078 break;
4079 case GST_STATE_CHANGE_READY_TO_PAUSED:{
4080 /* Change state async, because our child sink might not
4081 * be ready to do that for us yet if it's state is still locked */
4082
4083 splitmux->need_async_start = TRUE;
4084 /* we want to go async to PAUSED until we managed to configure and add the
4085 * sink */
4086 GST_SPLITMUX_LOCK (splitmux);
4087 do_async_start (splitmux);
4088 GST_SPLITMUX_UNLOCK (splitmux);
4089 ret = GST_STATE_CHANGE_ASYNC;
4090 break;
4091 }
4092 case GST_STATE_CHANGE_READY_TO_NULL:
4093 GST_SPLITMUX_LOCK (splitmux);
4094 splitmux->fragment_id = 0;
4095 /* Reset internal elements only if no pad contexts are using them */
4096 if (splitmux->contexts == NULL)
4097 gst_splitmux_reset_elements (splitmux);
4098 do_async_done (splitmux);
4099 GST_SPLITMUX_UNLOCK (splitmux);
4100 break;
4101 default:
4102 break;
4103 }
4104
4105 return ret;
4106
4107 beach:
4108 if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4109 /* Cleanup elements on failed transition out of NULL */
4110 gst_splitmux_reset_elements (splitmux);
4111 GST_SPLITMUX_LOCK (splitmux);
4112 do_async_done (splitmux);
4113 GST_SPLITMUX_UNLOCK (splitmux);
4114 }
4115 if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4116 /* READY to READY transition only happens when we're already
4117 * in READY state, but a child element is in NULL, which
4118 * happens when there's an error changing the state of the sink.
4119 * We need to make sure not to fail the state transition, or
4120 * the core won't transition us back to NULL successfully */
4121 ret = GST_STATE_CHANGE_SUCCESS;
4122 }
4123 return ret;
4124 }
4125
4126 static void
gst_splitmux_sink_ensure_max_files(GstSplitMuxSink * splitmux)4127 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4128 {
4129 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4130 splitmux->fragment_id = 0;
4131 }
4132 }
4133
4134 static void
split_now(GstSplitMuxSink * splitmux)4135 split_now (GstSplitMuxSink * splitmux)
4136 {
4137 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4138 }
4139
4140 static void
split_after(GstSplitMuxSink * splitmux)4141 split_after (GstSplitMuxSink * splitmux)
4142 {
4143 g_atomic_int_set (&(splitmux->split_requested), TRUE);
4144 }
4145
4146 static void
split_at_running_time(GstSplitMuxSink * splitmux,GstClockTime split_time)4147 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4148 {
4149 gboolean send_keyframe_requests;
4150
4151 GST_SPLITMUX_LOCK (splitmux);
4152 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4153 send_keyframe_requests = splitmux->send_keyframe_requests;
4154 GST_SPLITMUX_UNLOCK (splitmux);
4155
4156 if (send_keyframe_requests) {
4157 GstEvent *ev =
4158 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4159 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4160 GST_TIME_ARGS (split_time));
4161 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4162 GST_WARNING_OBJECT (splitmux,
4163 "Could not request keyframe at %" GST_TIME_FORMAT,
4164 GST_TIME_ARGS (split_time));
4165 }
4166 }
4167 }
4168