1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 /**
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
23 *
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
28 * desired.
29 *
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
32 *
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
35 *
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
40 *
41 * In the async-finalize mode, when the threshold is crossed, the old muxer
42 * and sink is disconnected from the pipeline and left to finish the file
43 * asynchronously, and a new muxer and sink is created to continue with the
44 * next fragment. For that reason, instead of muxer and sink objects, the
45 * muxer-factory and sink-factory properties are used to construct the new
46 * objects, together with muxer-properties and sink-properties.
47 *
48 * <refsect2>
49 * <title>Example pipelines</title>
50 * |[
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52 * ]|
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
56 *
57 * |[
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59 * ]|
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
63 * </refsect2>
64 */
65
66 #ifdef HAVE_CONFIG_H
67 #include "config.h"
68 #endif
69
70 #include <string.h>
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
74
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
77
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
82
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
85
86 static void split_now (GstSplitMuxSink * splitmux);
87 static void split_after (GstSplitMuxSink * splitmux);
88 static void split_at_running_time (GstSplitMuxSink * splitmux,
89 GstClockTime split_time);
90
91 enum
92 {
93 PROP_0,
94 PROP_LOCATION,
95 PROP_MAX_SIZE_TIME,
96 PROP_MAX_SIZE_BYTES,
97 PROP_MAX_SIZE_TIMECODE,
98 PROP_SEND_KEYFRAME_REQUESTS,
99 PROP_MAX_FILES,
100 PROP_MUXER_OVERHEAD,
101 PROP_USE_ROBUST_MUXING,
102 PROP_ALIGNMENT_THRESHOLD,
103 PROP_MUXER,
104 PROP_SINK,
105 PROP_RESET_MUXER,
106 PROP_ASYNC_FINALIZE,
107 PROP_MUXER_FACTORY,
108 PROP_MUXER_PROPERTIES,
109 PROP_SINK_FACTORY,
110 PROP_SINK_PROPERTIES
111 };
112
113 #define DEFAULT_MAX_SIZE_TIME 0
114 #define DEFAULT_MAX_SIZE_BYTES 0
115 #define DEFAULT_MAX_FILES 0
116 #define DEFAULT_MUXER_OVERHEAD 0.02
117 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
118 #define DEFAULT_ALIGNMENT_THRESHOLD 0
119 #define DEFAULT_MUXER "mp4mux"
120 #define DEFAULT_SINK "filesink"
121 #define DEFAULT_USE_ROBUST_MUXING FALSE
122 #define DEFAULT_RESET_MUXER TRUE
123 #define DEFAULT_ASYNC_FINALIZE FALSE
124
125 typedef struct _AsyncEosHelper
126 {
127 MqStreamCtx *ctx;
128 GstPad *pad;
129 } AsyncEosHelper;
130
131 enum
132 {
133 SIGNAL_FORMAT_LOCATION,
134 SIGNAL_FORMAT_LOCATION_FULL,
135 SIGNAL_SPLIT_NOW,
136 SIGNAL_SPLIT_AFTER,
137 SIGNAL_SPLIT_AT_RUNNING_TIME,
138 SIGNAL_MUXER_ADDED,
139 SIGNAL_SINK_ADDED,
140 SIGNAL_LAST
141 };
142
143 static guint signals[SIGNAL_LAST];
144
145 static GstStaticPadTemplate video_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("video",
147 GST_PAD_SINK,
148 GST_PAD_REQUEST,
149 GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate audio_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("audio_%u",
152 GST_PAD_SINK,
153 GST_PAD_REQUEST,
154 GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate subtitle_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
157 GST_PAD_SINK,
158 GST_PAD_REQUEST,
159 GST_STATIC_CAPS_ANY);
160 static GstStaticPadTemplate caption_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("caption_%u",
162 GST_PAD_SINK,
163 GST_PAD_REQUEST,
164 GST_STATIC_CAPS_ANY);
165
166 static GQuark PAD_CONTEXT;
167 static GQuark EOS_FROM_US;
168 static GQuark RUNNING_TIME;
169 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
170 * to forward an incoming EOS message, but we cannot rely on the state of the
171 * splitmux anymore, so we set this qdata on the sink instead.
172 * The muxer and sink must be destroyed after both of these things have
173 * finished:
174 * 1) The EOS message has been sent when the fragment is ending
175 * 2) The muxer has been unlinked and relinked
176 * Therefore, EOS_FROM_US can have these two values:
177 * 0: EOS was not requested from us. Forward the message. The muxer and the
178 * sink will be destroyed together with the rest of the bin.
179 * 1: EOS was requested from us, but the other of the two tasks hasn't
180 * finished. Set EOS_FROM_US to 2 and do your stuff.
181 * 2: EOS was requested from us and the other of the two tasks has finished.
182 * Now we can destroy the muxer and the sink.
183 */
184
185 static void
_do_init(void)186 _do_init (void)
187 {
188 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
189 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
190 RUNNING_TIME = g_quark_from_static_string ("running-time");
191 }
192
193 #define gst_splitmux_sink_parent_class parent_class
194 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
195 _do_init ());
196
197 static gboolean create_muxer (GstSplitMuxSink * splitmux);
198 static gboolean create_sink (GstSplitMuxSink * splitmux);
199 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
200 const GValue * value, GParamSpec * pspec);
201 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
202 GValue * value, GParamSpec * pspec);
203 static void gst_splitmux_sink_dispose (GObject * object);
204 static void gst_splitmux_sink_finalize (GObject * object);
205
206 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
207 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
208 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
209
210 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
211 element, GstStateChange transition);
212
213 static void bus_handler (GstBin * bin, GstMessage * msg);
214 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
215 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
216 static void mq_stream_ctx_free (MqStreamCtx * ctx);
217 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
218
219 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
220 static GstElement *create_element (GstSplitMuxSink * splitmux,
221 const gchar * factory, const gchar * name, gboolean locked);
222
223 static void do_async_done (GstSplitMuxSink * splitmux);
224
225 static MqStreamBuf *
mq_stream_buf_new(void)226 mq_stream_buf_new (void)
227 {
228 return g_slice_new0 (MqStreamBuf);
229 }
230
231 static void
mq_stream_buf_free(MqStreamBuf * data)232 mq_stream_buf_free (MqStreamBuf * data)
233 {
234 g_slice_free (MqStreamBuf, data);
235 }
236
237 static SplitMuxOutputCommand *
out_cmd_buf_new(void)238 out_cmd_buf_new (void)
239 {
240 return g_slice_new0 (SplitMuxOutputCommand);
241 }
242
243 static void
out_cmd_buf_free(SplitMuxOutputCommand * data)244 out_cmd_buf_free (SplitMuxOutputCommand * data)
245 {
246 g_slice_free (SplitMuxOutputCommand, data);
247 }
248
249 static void
gst_splitmux_sink_class_init(GstSplitMuxSinkClass * klass)250 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
251 {
252 GObjectClass *gobject_class = (GObjectClass *) klass;
253 GstElementClass *gstelement_class = (GstElementClass *) klass;
254 GstBinClass *gstbin_class = (GstBinClass *) klass;
255
256 gobject_class->set_property = gst_splitmux_sink_set_property;
257 gobject_class->get_property = gst_splitmux_sink_get_property;
258 gobject_class->dispose = gst_splitmux_sink_dispose;
259 gobject_class->finalize = gst_splitmux_sink_finalize;
260
261 gst_element_class_set_static_metadata (gstelement_class,
262 "Split Muxing Bin", "Generic/Bin/Muxer",
263 "Convenience bin that muxes incoming streams into multiple time/size limited files",
264 "Jan Schmidt <jan@centricular.com>");
265
266 gst_element_class_add_static_pad_template (gstelement_class,
267 &video_sink_template);
268 gst_element_class_add_static_pad_template (gstelement_class,
269 &audio_sink_template);
270 gst_element_class_add_static_pad_template (gstelement_class,
271 &subtitle_sink_template);
272 gst_element_class_add_static_pad_template (gstelement_class,
273 &caption_sink_template);
274
275 gstelement_class->change_state =
276 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
277 gstelement_class->request_new_pad =
278 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
279 gstelement_class->release_pad =
280 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
281
282 gstbin_class->handle_message = bus_handler;
283
284 g_object_class_install_property (gobject_class, PROP_LOCATION,
285 g_param_spec_string ("location", "File Output Pattern",
286 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
287 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
288 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
289 g_param_spec_double ("mux-overhead", "Muxing Overhead",
290 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
291 DEFAULT_MUXER_OVERHEAD,
292 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
293
294 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
295 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
296 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
297 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
299 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
300 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
301 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
303 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
304 "Maximum difference in timecode between first and last frame. "
305 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
306 "Will only be effective if a timecode track is present.",
307 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
309 g_param_spec_boolean ("send-keyframe-requests",
310 "Request keyframes at max-size-time",
311 "Request a keyframe every max-size-time ns to try splitting at that point. "
312 "Needs max-size-bytes to be 0 in order to be effective.",
313 DEFAULT_SEND_KEYFRAME_REQUESTS,
314 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
316 g_param_spec_uint ("max-files", "Max files",
317 "Maximum number of files to keep on disk. Once the maximum is reached,"
318 "old files start to be deleted to make room for new ones.", 0,
319 G_MAXUINT, DEFAULT_MAX_FILES,
320 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
322 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
323 "Allow non-reference streams to be that many ns before the reference"
324 " stream",
325 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
326 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327
328 g_object_class_install_property (gobject_class, PROP_MUXER,
329 g_param_spec_object ("muxer", "Muxer",
330 "The muxer element to use (NULL = default mp4mux). "
331 "Valid only for async-finalize = FALSE",
332 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333 g_object_class_install_property (gobject_class, PROP_SINK,
334 g_param_spec_object ("sink", "Sink",
335 "The sink element (or element chain) to use (NULL = default filesink). "
336 "Valid only for async-finalize = FALSE",
337 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338
339 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
340 g_param_spec_boolean ("use-robust-muxing",
341 "Support robust-muxing mode of some muxers",
342 "Check if muxers support robust muxing via the reserved-max-duration and "
343 "reserved-duration-remaining properties and use them if so. "
344 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
345 " create new fragments if the reserved header space is about to overflow. "
346 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
347 "manually by the app to a non-zero value for robust muxing to have an effect.",
348 DEFAULT_USE_ROBUST_MUXING,
349 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
352 g_param_spec_boolean ("reset-muxer",
353 "Reset Muxer",
354 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
355 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
358 g_param_spec_boolean ("async-finalize",
359 "Finalize fragments asynchronously",
360 "Finalize each fragment asynchronously and start a new one",
361 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
363 g_param_spec_string ("muxer-factory", "Muxer factory",
364 "The muxer element factory to use (default = mp4mux). "
365 "Valid only for async-finalize = TRUE",
366 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
368 g_param_spec_boxed ("muxer-properties", "Muxer properties",
369 "The muxer element properties to use. "
370 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
371 "Valid only for async-finalize = TRUE",
372 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
374 g_param_spec_string ("sink-factory", "Sink factory",
375 "The sink element factory to use (default = filesink). "
376 "Valid only for async-finalize = TRUE",
377 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
379 g_param_spec_boxed ("sink-properties", "Sink properties",
380 "The sink element properties to use. "
381 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
382 "Valid only for async-finalize = TRUE",
383 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
384
385 /**
386 * GstSplitMuxSink::format-location:
387 * @splitmux: the #GstSplitMuxSink
388 * @fragment_id: the sequence number of the file to be created
389 *
390 * Returns: the location to be used for the next output file
391 */
392 signals[SIGNAL_FORMAT_LOCATION] =
393 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
394 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
395
396 /**
397 * GstSplitMuxSink::format-location-full:
398 * @splitmux: the #GstSplitMuxSink
399 * @fragment_id: the sequence number of the file to be created
400 * @first_sample: A #GstSample containing the first buffer
401 * from the reference stream in the new file
402 *
403 * Returns: the location to be used for the next output file
404 */
405 signals[SIGNAL_FORMAT_LOCATION_FULL] =
406 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
407 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
408 GST_TYPE_SAMPLE);
409
410 /**
411 * GstSplitMuxSink::split-now:
412 * @splitmux: the #GstSplitMuxSink
413 *
414 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
415 * The current GOP will be output to the new file.
416 *
417 * Since: 1.14
418 */
419 signals[SIGNAL_SPLIT_NOW] =
420 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
421 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
422 split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
423
424 /**
425 * GstSplitMuxSink::split-after:
426 * @splitmux: the #GstSplitMuxSink
427 *
428 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
429 * The current GOP will be output to the old file.
430 *
431 * Since: 1.16
432 */
433 signals[SIGNAL_SPLIT_AFTER] =
434 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
435 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
436 split_after), NULL, NULL, NULL, G_TYPE_NONE, 0);
437
438 /**
439 * GstSplitMuxSink::split-now:
440 * @splitmux: the #GstSplitMuxSink
441 *
442 * When called by the user, this action signal splits the video file (and
443 * begins a new one) as soon as the given running time is reached. If this
444 * action signal is called multiple times, running times are queued up and
445 * processed in the order they were given.
446 *
447 * Note that this is prone to race conditions, where said running time is
448 * reached and surpassed before we had a chance to split. The file will
449 * still split immediately, but in order to make sure that the split doesn't
450 * happen too late, it is recommended to call this action signal from
451 * something that will prevent further buffers from flowing into
452 * splitmuxsink before the split is completed, such as a pad probe before
453 * splitmuxsink.
454 *
455 *
456 * Since: 1.16
457 */
458 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
459 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
460 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
461 split_at_running_time), NULL, NULL, NULL, G_TYPE_NONE, 1,
462 G_TYPE_UINT64);
463
464 /**
465 * GstSplitMuxSink::muxer-added:
466 * @splitmux: the #GstSplitMuxSink
467 * @muxer: the newly added muxer element
468 *
469 * Since: 1.14
470 */
471 signals[SIGNAL_MUXER_ADDED] =
472 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
473 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
474
475 /**
476 * GstSplitMuxSink::sink-added:
477 * @splitmux: the #GstSplitMuxSink
478 * @sink: the newly added sink element
479 *
480 * Since: 1.14
481 */
482 signals[SIGNAL_SINK_ADDED] =
483 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
484 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
485
486 klass->split_now = split_now;
487 klass->split_after = split_after;
488 klass->split_at_running_time = split_at_running_time;
489 }
490
491 static void
gst_splitmux_sink_init(GstSplitMuxSink * splitmux)492 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
493 {
494 g_mutex_init (&splitmux->lock);
495 g_cond_init (&splitmux->input_cond);
496 g_cond_init (&splitmux->output_cond);
497 g_queue_init (&splitmux->out_cmd_q);
498
499 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
500 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
501 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
502 splitmux->max_files = DEFAULT_MAX_FILES;
503 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
504 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
505 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
506 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
507 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
508
509 splitmux->threshold_timecode_str = NULL;
510
511 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
512 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
513 splitmux->muxer_properties = NULL;
514 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
515 splitmux->sink_properties = NULL;
516
517 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
518 splitmux->split_requested = FALSE;
519 splitmux->do_split_next_gop = FALSE;
520 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
521 }
522
523 static void
gst_splitmux_reset(GstSplitMuxSink * splitmux)524 gst_splitmux_reset (GstSplitMuxSink * splitmux)
525 {
526 if (splitmux->muxer) {
527 gst_element_set_locked_state (splitmux->muxer, TRUE);
528 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
529 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
530 }
531 if (splitmux->active_sink) {
532 gst_element_set_locked_state (splitmux->active_sink, TRUE);
533 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
534 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
535 }
536
537 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
538 }
539
540 static void
gst_splitmux_sink_dispose(GObject * object)541 gst_splitmux_sink_dispose (GObject * object)
542 {
543 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
544
545 /* Calling parent dispose invalidates all child pointers */
546 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
547
548 G_OBJECT_CLASS (parent_class)->dispose (object);
549 }
550
551 static void
gst_splitmux_sink_finalize(GObject * object)552 gst_splitmux_sink_finalize (GObject * object)
553 {
554 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
555 g_cond_clear (&splitmux->input_cond);
556 g_cond_clear (&splitmux->output_cond);
557 g_mutex_clear (&splitmux->lock);
558 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
559 g_queue_clear (&splitmux->out_cmd_q);
560
561 if (splitmux->provided_sink)
562 gst_object_unref (splitmux->provided_sink);
563 if (splitmux->provided_muxer)
564 gst_object_unref (splitmux->provided_muxer);
565
566 if (splitmux->muxer_factory)
567 g_free (splitmux->muxer_factory);
568 if (splitmux->muxer_properties)
569 gst_structure_free (splitmux->muxer_properties);
570 if (splitmux->sink_factory)
571 g_free (splitmux->sink_factory);
572 if (splitmux->sink_properties)
573 gst_structure_free (splitmux->sink_properties);
574
575 if (splitmux->threshold_timecode_str)
576 g_free (splitmux->threshold_timecode_str);
577
578 if (splitmux->times_to_split)
579 gst_queue_array_free (splitmux->times_to_split);
580
581 g_free (splitmux->location);
582
583 /* Make sure to free any un-released contexts. There should not be any,
584 * because the dispose will have freed all request pads though */
585 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
586 g_list_free (splitmux->contexts);
587
588 G_OBJECT_CLASS (parent_class)->finalize (object);
589 }
590
591 /*
592 * Set any time threshold to the muxer, if it has
593 * reserved-max-duration and reserved-duration-remaining
594 * properties. Called when creating/claiming the muxer
595 * in create_elements() */
596 static void
update_muxer_properties(GstSplitMuxSink * sink)597 update_muxer_properties (GstSplitMuxSink * sink)
598 {
599 GObjectClass *klass;
600 GstClockTime threshold_time;
601
602 sink->muxer_has_reserved_props = FALSE;
603 if (sink->muxer == NULL)
604 return;
605 klass = G_OBJECT_GET_CLASS (sink->muxer);
606 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
607 return;
608 if (g_object_class_find_property (klass,
609 "reserved-duration-remaining") == NULL)
610 return;
611 sink->muxer_has_reserved_props = TRUE;
612
613 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
614 GST_TIME_ARGS (sink->threshold_time));
615 GST_OBJECT_LOCK (sink);
616 threshold_time = sink->threshold_time;
617 GST_OBJECT_UNLOCK (sink);
618
619 if (threshold_time > 0) {
620 /* Tell the muxer how much space to reserve */
621 GstClockTime muxer_threshold = threshold_time;
622 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
623 }
624 }
625
626 static void
gst_splitmux_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)627 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
628 const GValue * value, GParamSpec * pspec)
629 {
630 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
631
632 switch (prop_id) {
633 case PROP_LOCATION:{
634 GST_OBJECT_LOCK (splitmux);
635 g_free (splitmux->location);
636 splitmux->location = g_value_dup_string (value);
637 GST_OBJECT_UNLOCK (splitmux);
638 break;
639 }
640 case PROP_MAX_SIZE_BYTES:
641 GST_OBJECT_LOCK (splitmux);
642 splitmux->threshold_bytes = g_value_get_uint64 (value);
643 GST_OBJECT_UNLOCK (splitmux);
644 break;
645 case PROP_MAX_SIZE_TIME:
646 GST_OBJECT_LOCK (splitmux);
647 splitmux->threshold_time = g_value_get_uint64 (value);
648 GST_OBJECT_UNLOCK (splitmux);
649 break;
650 case PROP_MAX_SIZE_TIMECODE:
651 GST_OBJECT_LOCK (splitmux);
652 splitmux->threshold_timecode_str = g_value_dup_string (value);
653 GST_OBJECT_UNLOCK (splitmux);
654 break;
655 case PROP_SEND_KEYFRAME_REQUESTS:
656 GST_OBJECT_LOCK (splitmux);
657 splitmux->send_keyframe_requests = g_value_get_boolean (value);
658 GST_OBJECT_UNLOCK (splitmux);
659 break;
660 case PROP_MAX_FILES:
661 GST_OBJECT_LOCK (splitmux);
662 splitmux->max_files = g_value_get_uint (value);
663 GST_OBJECT_UNLOCK (splitmux);
664 break;
665 case PROP_MUXER_OVERHEAD:
666 GST_OBJECT_LOCK (splitmux);
667 splitmux->mux_overhead = g_value_get_double (value);
668 GST_OBJECT_UNLOCK (splitmux);
669 break;
670 case PROP_USE_ROBUST_MUXING:
671 GST_OBJECT_LOCK (splitmux);
672 splitmux->use_robust_muxing = g_value_get_boolean (value);
673 GST_OBJECT_UNLOCK (splitmux);
674 if (splitmux->use_robust_muxing)
675 update_muxer_properties (splitmux);
676 break;
677 case PROP_ALIGNMENT_THRESHOLD:
678 GST_OBJECT_LOCK (splitmux);
679 splitmux->alignment_threshold = g_value_get_uint64 (value);
680 GST_OBJECT_UNLOCK (splitmux);
681 break;
682 case PROP_SINK:
683 GST_OBJECT_LOCK (splitmux);
684 if (splitmux->provided_sink)
685 gst_object_unref (splitmux->provided_sink);
686 splitmux->provided_sink = g_value_get_object (value);
687 gst_object_ref_sink (splitmux->provided_sink);
688 GST_OBJECT_UNLOCK (splitmux);
689 break;
690 case PROP_MUXER:
691 GST_OBJECT_LOCK (splitmux);
692 if (splitmux->provided_muxer)
693 gst_object_unref (splitmux->provided_muxer);
694 splitmux->provided_muxer = g_value_get_object (value);
695 gst_object_ref_sink (splitmux->provided_muxer);
696 GST_OBJECT_UNLOCK (splitmux);
697 break;
698 case PROP_RESET_MUXER:
699 GST_OBJECT_LOCK (splitmux);
700 splitmux->reset_muxer = g_value_get_boolean (value);
701 GST_OBJECT_UNLOCK (splitmux);
702 break;
703 case PROP_ASYNC_FINALIZE:
704 GST_OBJECT_LOCK (splitmux);
705 splitmux->async_finalize = g_value_get_boolean (value);
706 GST_OBJECT_UNLOCK (splitmux);
707 break;
708 case PROP_MUXER_FACTORY:
709 GST_OBJECT_LOCK (splitmux);
710 if (splitmux->muxer_factory)
711 g_free (splitmux->muxer_factory);
712 splitmux->muxer_factory = g_value_dup_string (value);
713 GST_OBJECT_UNLOCK (splitmux);
714 break;
715 case PROP_MUXER_PROPERTIES:
716 GST_OBJECT_LOCK (splitmux);
717 if (splitmux->muxer_properties)
718 gst_structure_free (splitmux->muxer_properties);
719 if (gst_value_get_structure (value))
720 splitmux->muxer_properties =
721 gst_structure_copy (gst_value_get_structure (value));
722 else
723 splitmux->muxer_properties = NULL;
724 GST_OBJECT_UNLOCK (splitmux);
725 break;
726 case PROP_SINK_FACTORY:
727 GST_OBJECT_LOCK (splitmux);
728 if (splitmux->sink_factory)
729 g_free (splitmux->sink_factory);
730 splitmux->sink_factory = g_value_dup_string (value);
731 GST_OBJECT_UNLOCK (splitmux);
732 break;
733 case PROP_SINK_PROPERTIES:
734 GST_OBJECT_LOCK (splitmux);
735 if (splitmux->sink_properties)
736 gst_structure_free (splitmux->sink_properties);
737 if (gst_value_get_structure (value))
738 splitmux->sink_properties =
739 gst_structure_copy (gst_value_get_structure (value));
740 else
741 splitmux->sink_properties = NULL;
742 GST_OBJECT_UNLOCK (splitmux);
743 break;
744 default:
745 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
746 break;
747 }
748 }
749
750 static void
gst_splitmux_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)751 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
752 GValue * value, GParamSpec * pspec)
753 {
754 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
755
756 switch (prop_id) {
757 case PROP_LOCATION:
758 GST_OBJECT_LOCK (splitmux);
759 g_value_set_string (value, splitmux->location);
760 GST_OBJECT_UNLOCK (splitmux);
761 break;
762 case PROP_MAX_SIZE_BYTES:
763 GST_OBJECT_LOCK (splitmux);
764 g_value_set_uint64 (value, splitmux->threshold_bytes);
765 GST_OBJECT_UNLOCK (splitmux);
766 break;
767 case PROP_MAX_SIZE_TIME:
768 GST_OBJECT_LOCK (splitmux);
769 g_value_set_uint64 (value, splitmux->threshold_time);
770 GST_OBJECT_UNLOCK (splitmux);
771 break;
772 case PROP_MAX_SIZE_TIMECODE:
773 GST_OBJECT_LOCK (splitmux);
774 g_value_set_string (value, splitmux->threshold_timecode_str);
775 GST_OBJECT_UNLOCK (splitmux);
776 break;
777 case PROP_SEND_KEYFRAME_REQUESTS:
778 GST_OBJECT_LOCK (splitmux);
779 g_value_set_boolean (value, splitmux->send_keyframe_requests);
780 GST_OBJECT_UNLOCK (splitmux);
781 break;
782 case PROP_MAX_FILES:
783 GST_OBJECT_LOCK (splitmux);
784 g_value_set_uint (value, splitmux->max_files);
785 GST_OBJECT_UNLOCK (splitmux);
786 break;
787 case PROP_MUXER_OVERHEAD:
788 GST_OBJECT_LOCK (splitmux);
789 g_value_set_double (value, splitmux->mux_overhead);
790 GST_OBJECT_UNLOCK (splitmux);
791 break;
792 case PROP_USE_ROBUST_MUXING:
793 GST_OBJECT_LOCK (splitmux);
794 g_value_set_boolean (value, splitmux->use_robust_muxing);
795 GST_OBJECT_UNLOCK (splitmux);
796 break;
797 case PROP_ALIGNMENT_THRESHOLD:
798 GST_OBJECT_LOCK (splitmux);
799 g_value_set_uint64 (value, splitmux->alignment_threshold);
800 GST_OBJECT_UNLOCK (splitmux);
801 break;
802 case PROP_SINK:
803 GST_OBJECT_LOCK (splitmux);
804 g_value_set_object (value, splitmux->provided_sink);
805 GST_OBJECT_UNLOCK (splitmux);
806 break;
807 case PROP_MUXER:
808 GST_OBJECT_LOCK (splitmux);
809 g_value_set_object (value, splitmux->provided_muxer);
810 GST_OBJECT_UNLOCK (splitmux);
811 break;
812 case PROP_RESET_MUXER:
813 GST_OBJECT_LOCK (splitmux);
814 g_value_set_boolean (value, splitmux->reset_muxer);
815 GST_OBJECT_UNLOCK (splitmux);
816 break;
817 case PROP_ASYNC_FINALIZE:
818 GST_OBJECT_LOCK (splitmux);
819 g_value_set_boolean (value, splitmux->async_finalize);
820 GST_OBJECT_UNLOCK (splitmux);
821 break;
822 case PROP_MUXER_FACTORY:
823 GST_OBJECT_LOCK (splitmux);
824 g_value_set_string (value, splitmux->muxer_factory);
825 GST_OBJECT_UNLOCK (splitmux);
826 break;
827 case PROP_MUXER_PROPERTIES:
828 GST_OBJECT_LOCK (splitmux);
829 gst_value_set_structure (value, splitmux->muxer_properties);
830 GST_OBJECT_UNLOCK (splitmux);
831 break;
832 case PROP_SINK_FACTORY:
833 GST_OBJECT_LOCK (splitmux);
834 g_value_set_string (value, splitmux->sink_factory);
835 GST_OBJECT_UNLOCK (splitmux);
836 break;
837 case PROP_SINK_PROPERTIES:
838 GST_OBJECT_LOCK (splitmux);
839 gst_value_set_structure (value, splitmux->sink_properties);
840 GST_OBJECT_UNLOCK (splitmux);
841 break;
842 default:
843 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
844 break;
845 }
846 }
847
848 /* Convenience function */
849 static inline GstClockTimeDiff
my_segment_to_running_time(GstSegment * segment,GstClockTime val)850 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
851 {
852 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
853
854 if (GST_CLOCK_TIME_IS_VALID (val)) {
855 gboolean sign =
856 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
857 if (sign > 0)
858 res = val;
859 else if (sign < 0)
860 res = -val;
861 }
862 return res;
863 }
864
865 static MqStreamCtx *
mq_stream_ctx_new(GstSplitMuxSink * splitmux)866 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
867 {
868 MqStreamCtx *ctx;
869
870 ctx = g_new0 (MqStreamCtx, 1);
871 ctx->splitmux = splitmux;
872 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
873 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
874 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
875 g_queue_init (&ctx->queued_bufs);
876 return ctx;
877 }
878
879 static void
mq_stream_ctx_free(MqStreamCtx * ctx)880 mq_stream_ctx_free (MqStreamCtx * ctx)
881 {
882 if (ctx->q) {
883 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
884
885 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
886
887 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
888 gst_element_set_locked_state (ctx->q, TRUE);
889 gst_element_set_state (ctx->q, GST_STATE_NULL);
890 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
891 gst_object_unref (parent);
892 }
893 gst_object_unref (ctx->q);
894 }
895 gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
896 gst_object_unref (ctx->sinkpad);
897 gst_object_unref (ctx->srcpad);
898 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
899 g_queue_clear (&ctx->queued_bufs);
900 g_free (ctx);
901 }
902
903 static void
send_fragment_opened_closed_msg(GstSplitMuxSink * splitmux,gboolean opened,GstElement * sink)904 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
905 GstElement * sink)
906 {
907 gchar *location = NULL;
908 GstMessage *msg;
909 const gchar *msg_name = opened ?
910 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
911 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
912
913 if (!opened) {
914 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
915 if (rtime)
916 running_time = *rtime;
917 }
918
919 g_object_get (sink, "location", &location, NULL);
920
921 /* If it's in the middle of a teardown, the reference_ctc might have become
922 * NULL */
923 if (splitmux->reference_ctx) {
924 msg = gst_message_new_element (GST_OBJECT (splitmux),
925 gst_structure_new (msg_name,
926 "location", G_TYPE_STRING, location,
927 "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
928 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
929 }
930
931 g_free (location);
932 }
933
934 static void
send_eos_async(GstSplitMuxSink * splitmux,AsyncEosHelper * helper)935 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
936 {
937 GstEvent *eos;
938 GstPad *pad;
939 MqStreamCtx *ctx;
940
941 eos = gst_event_new_eos ();
942 pad = helper->pad;
943 ctx = helper->ctx;
944
945 GST_SPLITMUX_LOCK (splitmux);
946 if (!pad)
947 pad = gst_pad_get_peer (ctx->srcpad);
948 GST_SPLITMUX_UNLOCK (splitmux);
949
950 gst_pad_send_event (pad, eos);
951 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
952
953 gst_object_unref (pad);
954 g_free (helper);
955 }
956
957 /* Called with lock held, drops the lock to send EOS to the
958 * pad
959 */
960 static void
send_eos(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)961 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
962 {
963 GstEvent *eos;
964 GstPad *pad;
965
966 eos = gst_event_new_eos ();
967 pad = gst_pad_get_peer (ctx->srcpad);
968
969 ctx->out_eos = TRUE;
970
971 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
972 GST_SPLITMUX_UNLOCK (splitmux);
973 gst_pad_send_event (pad, eos);
974 GST_SPLITMUX_LOCK (splitmux);
975
976 gst_object_unref (pad);
977 }
978
979 /* Called with lock held. Schedules an EOS event to the ctx pad
980 * to happen in another thread */
981 static void
eos_context_async(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)982 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
983 {
984 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
985 GstPad *srcpad, *sinkpad;
986
987 srcpad = ctx->srcpad;
988 sinkpad = gst_pad_get_peer (srcpad);
989
990 helper->ctx = ctx;
991 helper->pad = sinkpad; /* Takes the reference */
992
993 ctx->out_eos_async_done = TRUE;
994 /* HACK: Here, we explicitly unset the SINK flag on the target sink element
995 * that's about to be asynchronously disposed, so that it no longer
996 * participates in GstBin EOS logic. This fixes a race where if
997 * splitmuxsink really reaches EOS before an asynchronous background
998 * element has finished, then the bin won't actually send EOS to the
999 * pipeline. Even after finishing and removing the old element, the
1000 * bin doesn't re-check EOS status on removing a SINK element. This
1001 * should be fixed in core, making this hack unnecessary. */
1002 GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
1003
1004 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1005 sinkpad, ctx);
1006
1007 g_assert_nonnull (helper->pad);
1008 gst_element_call_async (GST_ELEMENT (splitmux),
1009 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1010 }
1011
1012 /* Called with lock held. TRUE iff all contexts have a
1013 * pending (or delivered) async eos event */
1014 static gboolean
all_contexts_are_async_eos(GstSplitMuxSink * splitmux)1015 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1016 {
1017 gboolean ret = TRUE;
1018 GList *item;
1019
1020 for (item = splitmux->contexts; item; item = item->next) {
1021 MqStreamCtx *ctx = item->data;
1022 ret &= ctx->out_eos_async_done;
1023 }
1024 return ret;
1025 }
1026
1027 /* Called with splitmux lock held to check if this output
1028 * context needs to sleep to wait for the release of the
1029 * next GOP, or to send EOS to close out the current file
1030 */
1031 static void
complete_or_wait_on_out(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1032 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1033 {
1034 if (ctx->caps_change)
1035 return;
1036
1037 do {
1038 /* When first starting up, the reference stream has to output
1039 * the first buffer to prepare the muxer and sink */
1040 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1041 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1042
1043 if (!(splitmux->max_out_running_time == 0 ||
1044 splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1045 splitmux->alignment_threshold == 0 ||
1046 splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1047 my_max_out_running_time -= splitmux->alignment_threshold;
1048 GST_LOG_OBJECT (ctx->srcpad,
1049 "Max out running time currently %" GST_STIME_FORMAT
1050 ", with threshold applied it is %" GST_STIME_FORMAT,
1051 GST_STIME_ARGS (splitmux->max_out_running_time),
1052 GST_STIME_ARGS (my_max_out_running_time));
1053 }
1054
1055 if (ctx->flushing
1056 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1057 return;
1058
1059 GST_LOG_OBJECT (ctx->srcpad,
1060 "Checking running time %" GST_STIME_FORMAT " against max %"
1061 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1062 GST_STIME_ARGS (my_max_out_running_time));
1063
1064 if (can_output) {
1065 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1066 ctx->out_running_time < my_max_out_running_time) {
1067 return;
1068 }
1069
1070 switch (splitmux->output_state) {
1071 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1072 /* We only get here if we've finished outputting a GOP and need to know
1073 * what to do next */
1074 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1075 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1076 continue;
1077
1078 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1079 /* We've reached the max out running_time to get here, so end this file now */
1080 if (ctx->out_eos == FALSE) {
1081 if (splitmux->async_finalize) {
1082 /* We must set EOS asynchronously at this point. We cannot defer
1083 * it, because we need all contexts to wake up, for the
1084 * reference context to eventually give us something at
1085 * START_NEXT_FILE. Otherwise, collectpads might choose another
1086 * context to give us the first buffer, and format-location-full
1087 * will not contain a valid sample. */
1088 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1089 GINT_TO_POINTER (1));
1090 eos_context_async (ctx, splitmux);
1091 if (all_contexts_are_async_eos (splitmux)) {
1092 GST_INFO_OBJECT (splitmux,
1093 "All contexts are async_eos. Moving to the next file.");
1094 /* We can start the next file once we've asked each pad to go EOS */
1095 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1096 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1097 continue;
1098 }
1099 } else {
1100 send_eos (splitmux, ctx);
1101 continue;
1102 }
1103 } else {
1104 GST_INFO_OBJECT (splitmux,
1105 "At end-of-file state, but context %p is already EOS", ctx);
1106 }
1107 break;
1108 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1109 if (ctx->is_reference) {
1110 /* Special handling on the reference ctx to start new fragments
1111 * and collect commands from the command queue */
1112 /* drops the splitmux lock briefly: */
1113 /* We must have reference ctx in order for format-location-full to
1114 * have a sample */
1115 start_next_fragment (splitmux, ctx);
1116 continue;
1117 }
1118 break;
1119 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1120 do {
1121 SplitMuxOutputCommand *cmd =
1122 g_queue_pop_tail (&splitmux->out_cmd_q);
1123 if (cmd != NULL) {
1124 /* If we pop the last command, we need to make our queues bigger */
1125 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1126 grow_blocked_queues (splitmux);
1127
1128 if (cmd->start_new_fragment) {
1129 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1130 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1131 } else {
1132 GST_DEBUG_OBJECT (splitmux,
1133 "Got new output cmd for time %" GST_STIME_FORMAT,
1134 GST_STIME_ARGS (cmd->max_output_ts));
1135
1136 /* Extend the output range immediately */
1137 splitmux->max_out_running_time = cmd->max_output_ts;
1138 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1139 }
1140 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1141
1142 out_cmd_buf_free (cmd);
1143 break;
1144 } else {
1145 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1146 /* ohos.ext.func.0008
1147 * when GST_EVENT_FLUSH_START, ctx->flushing is set to TRUE.
1148 * GST_SPLITMUX_BROADCAST_OUTPUT(splitmux) was called.
1149 * this loop must break when fushing is set.
1150 */
1151 #ifdef OHOS_EXT_FUNC
1152 if (ctx->flushing) {
1153 GST_DEBUG_OBJECT (ctx->srcpad, "splitmuxsink is flushing, break.");
1154 break;
1155 }
1156 #endif
1157 }
1158 } while (splitmux->output_state ==
1159 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1160 /* loop and re-check the state */
1161 continue;
1162 }
1163 case SPLITMUX_OUTPUT_STATE_STOPPED:
1164 return;
1165 }
1166 }
1167
1168 GST_INFO_OBJECT (ctx->srcpad,
1169 "Sleeping for running time %"
1170 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1171 GST_STIME_ARGS (ctx->out_running_time),
1172 GST_STIME_ARGS (splitmux->max_out_running_time));
1173 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1174 GST_INFO_OBJECT (ctx->srcpad,
1175 "Woken for new max running time %" GST_STIME_FORMAT,
1176 GST_STIME_ARGS (splitmux->max_out_running_time));
1177 }
1178 while (1);
1179 }
1180
1181 static GstClockTime
calculate_next_max_timecode(GstSplitMuxSink * splitmux,const GstVideoTimeCode * cur_tc)1182 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1183 const GstVideoTimeCode * cur_tc)
1184 {
1185 GstVideoTimeCode *target_tc;
1186 GstVideoTimeCodeInterval *tc_inter;
1187 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1188
1189 if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1190 return GST_CLOCK_TIME_NONE;
1191
1192 tc_inter =
1193 gst_video_time_code_interval_new_from_string
1194 (splitmux->threshold_timecode_str);
1195 target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1196 gst_video_time_code_interval_free (tc_inter);
1197
1198 /* Convert to ns */
1199 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1200 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1201
1202 /* Add fragment_start_time, accounting for wraparound */
1203 if (target_tc_time >= cur_tc_time) {
1204 next_max_tc_time =
1205 target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1206 } else {
1207 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1208
1209 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1210 (cur_tc->config.fps_d == 1001)) {
1211 /* Checking fps_d is probably unneeded, but better safe than sorry
1212 * (e.g. someone accidentally set a flag) */
1213 GstVideoTimeCode *tc_for_offset;
1214
1215 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1216 * but slightly less. Calculate that duration from a fake timecode. The
1217 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1218 * is to add one frame to 23:59:59;29 */
1219 tc_for_offset =
1220 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1221 NULL, cur_tc->config.flags, 23, 59, 59,
1222 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1223 day_in_ns =
1224 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1225 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1226 cur_tc->config.fps_n);
1227 gst_video_time_code_free (tc_for_offset);
1228 }
1229 next_max_tc_time =
1230 day_in_ns - cur_tc_time + target_tc_time +
1231 splitmux->fragment_start_time;
1232 }
1233
1234 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1235 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1236 GST_TIME_ARGS (cur_tc_time));
1237 gst_video_time_code_free (target_tc);
1238
1239 return next_max_tc_time;
1240 }
1241
1242 static gboolean
request_next_keyframe(GstSplitMuxSink * splitmux,GstBuffer * buffer)1243 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1244 {
1245 GstEvent *ev;
1246 GstClockTime target_time;
1247 gboolean timecode_based = FALSE;
1248
1249 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1250 if (splitmux->threshold_timecode_str) {
1251 GstVideoTimeCodeMeta *tc_meta;
1252
1253 if (buffer != NULL) {
1254 tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1255 if (tc_meta) {
1256 splitmux->next_max_tc_time =
1257 calculate_next_max_timecode (splitmux, &tc_meta->tc);
1258 timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1259 }
1260 } else {
1261 /* This can happen in the presence of GAP events that trigger
1262 * a new fragment start */
1263 GST_WARNING_OBJECT (splitmux,
1264 "No buffer available to calculate next timecode");
1265 }
1266 }
1267
1268 if (splitmux->send_keyframe_requests == FALSE
1269 || (splitmux->threshold_time == 0 && !timecode_based)
1270 || splitmux->threshold_bytes != 0)
1271 return TRUE;
1272
1273 if (timecode_based) {
1274 /* We might have rounding errors: aim slightly earlier */
1275 target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1276 } else {
1277 target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1278 }
1279 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1280 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1281 GST_TIME_ARGS (target_time));
1282 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1283 }
1284
1285 static GstPadProbeReturn
handle_mq_output(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)1286 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1287 {
1288 GstSplitMuxSink *splitmux = ctx->splitmux;
1289 MqStreamBuf *buf_info = NULL;
1290
1291 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1292
1293 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1294 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1295 g_warning ("Buffer list handling not implemented");
1296 return GST_PAD_PROBE_DROP;
1297 }
1298 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1299 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1300 GstEvent *event = gst_pad_probe_info_get_event (info);
1301 gboolean locked = FALSE;
1302
1303 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1304
1305 switch (GST_EVENT_TYPE (event)) {
1306 case GST_EVENT_SEGMENT:
1307 gst_event_copy_segment (event, &ctx->out_segment);
1308 break;
1309 case GST_EVENT_FLUSH_STOP:
1310 GST_SPLITMUX_LOCK (splitmux);
1311 locked = TRUE;
1312 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1313 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1314 g_queue_clear (&ctx->queued_bufs);
1315 ctx->flushing = FALSE;
1316 break;
1317 case GST_EVENT_FLUSH_START:
1318 GST_SPLITMUX_LOCK (splitmux);
1319 locked = TRUE;
1320 GST_LOG_OBJECT (pad, "Flush start");
1321 ctx->flushing = TRUE;
1322 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1323 break;
1324 case GST_EVENT_EOS:
1325 GST_SPLITMUX_LOCK (splitmux);
1326 locked = TRUE;
1327 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1328 goto beach;
1329 ctx->out_eos = TRUE;
1330 GST_INFO_OBJECT (splitmux,
1331 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1332 break;
1333 case GST_EVENT_GAP:{
1334 GstClockTime gap_ts;
1335 GstClockTimeDiff rtime;
1336
1337 gst_event_parse_gap (event, &gap_ts, NULL);
1338 if (gap_ts == GST_CLOCK_TIME_NONE)
1339 break;
1340
1341 GST_SPLITMUX_LOCK (splitmux);
1342 locked = TRUE;
1343
1344 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1345 goto beach;
1346
1347 /* When we get a gap event on the
1348 * reference stream and we're trying to open a
1349 * new file, we need to store it until we get
1350 * the buffer afterwards
1351 */
1352 if (ctx->is_reference &&
1353 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1354 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1355 gst_event_replace (&ctx->pending_gap, event);
1356 GST_SPLITMUX_UNLOCK (splitmux);
1357 return GST_PAD_PROBE_HANDLED;
1358 }
1359
1360 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1361
1362 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1363 GST_STIME_ARGS (rtime));
1364
1365 if (rtime != GST_CLOCK_STIME_NONE) {
1366 ctx->out_running_time = rtime;
1367 complete_or_wait_on_out (splitmux, ctx);
1368 }
1369 break;
1370 }
1371 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1372 const GstStructure *s;
1373 GstClockTimeDiff ts = 0;
1374
1375 s = gst_event_get_structure (event);
1376 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1377 break;
1378
1379 gst_structure_get_int64 (s, "timestamp", &ts);
1380
1381 GST_SPLITMUX_LOCK (splitmux);
1382 locked = TRUE;
1383
1384 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1385 goto beach;
1386 ctx->out_running_time = ts;
1387 if (!ctx->is_reference)
1388 complete_or_wait_on_out (splitmux, ctx);
1389 GST_SPLITMUX_UNLOCK (splitmux);
1390 return GST_PAD_PROBE_DROP;
1391 }
1392 case GST_EVENT_CAPS:{
1393 GstPad *peer;
1394
1395 if (!ctx->is_reference)
1396 break;
1397
1398 peer = gst_pad_get_peer (pad);
1399 if (peer) {
1400 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1401
1402 gst_object_unref (peer);
1403
1404 if (ok)
1405 break;
1406
1407 } else {
1408 break;
1409 }
1410 /* This is in the case the muxer doesn't allow this change of caps */
1411 GST_SPLITMUX_LOCK (splitmux);
1412 locked = TRUE;
1413 ctx->caps_change = TRUE;
1414
1415 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1416 GST_DEBUG_OBJECT (splitmux,
1417 "New caps were not accepted. Switching output file");
1418 if (ctx->out_eos == FALSE) {
1419 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1420 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1421 }
1422 }
1423
1424 /* Lets it fall through, if it fails again, then the muxer just can't
1425 * support this format, but at least we have a closed file.
1426 */
1427 break;
1428 }
1429 default:
1430 break;
1431 }
1432
1433 /* We need to make sure events aren't passed
1434 * until the muxer / sink are ready for it */
1435 if (!locked)
1436 GST_SPLITMUX_LOCK (splitmux);
1437 /* ohos.ext.func.0008
1438 * when GST_EVENT_FLUSH_STOP, there is a very low probability freeze.
1439 * FLUSH_START and FLUSH_STOP should not wait for cmd in complete_or_wait_on_out.
1440 * cause upstream component will be block if these two EVENT wait in this function.
1441 */
1442 #ifdef OHOS_EXT_FUNC
1443 if (!ctx->is_reference && (GST_EVENT_TYPE(event) != GST_EVENT_FLUSH_START) &&
1444 (GST_EVENT_TYPE(event) != GST_EVENT_FLUSH_STOP))
1445 #else
1446 if (!ctx->is_reference)
1447 #endif
1448 complete_or_wait_on_out (splitmux, ctx);
1449 GST_SPLITMUX_UNLOCK (splitmux);
1450
1451 /* Don't try to forward sticky events before the next buffer is there
1452 * because it would cause a new file to be created without the first
1453 * buffer being available.
1454 */
1455 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1456 gst_event_unref (event);
1457 return GST_PAD_PROBE_HANDLED;
1458 } else
1459 return GST_PAD_PROBE_PASS;
1460 }
1461
1462 /* Allow everything through until the configured next stopping point */
1463 GST_SPLITMUX_LOCK (splitmux);
1464
1465 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1466 if (buf_info == NULL)
1467 /* Can only happen due to a poorly timed flush */
1468 goto beach;
1469
1470 /* If we have popped a keyframe, decrement the queued_gop count */
1471 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1472 splitmux->queued_keyframes--;
1473
1474 ctx->out_running_time = buf_info->run_ts;
1475 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1476
1477 GST_LOG_OBJECT (splitmux,
1478 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1479 " size %" G_GUINT64_FORMAT,
1480 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1481
1482 ctx->caps_change = FALSE;
1483
1484 complete_or_wait_on_out (splitmux, ctx);
1485
1486 splitmux->muxed_out_bytes += buf_info->buf_size;
1487
1488 #ifndef GST_DISABLE_GST_DEBUG
1489 {
1490 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1491 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1492 " run ts %" GST_STIME_FORMAT, buf,
1493 GST_STIME_ARGS (ctx->out_running_time));
1494 }
1495 #endif
1496
1497 ctx->cur_out_buffer = NULL;
1498 GST_SPLITMUX_UNLOCK (splitmux);
1499
1500 /* pending_gap is protected by the STREAM lock */
1501 if (ctx->pending_gap) {
1502 /* If we previously stored a gap event, send it now */
1503 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1504
1505 GST_DEBUG_OBJECT (splitmux,
1506 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1507
1508 gst_pad_send_event (peer, ctx->pending_gap);
1509 ctx->pending_gap = NULL;
1510
1511 gst_object_unref (peer);
1512 }
1513
1514 mq_stream_buf_free (buf_info);
1515
1516 return GST_PAD_PROBE_PASS;
1517
1518 beach:
1519 GST_SPLITMUX_UNLOCK (splitmux);
1520 return GST_PAD_PROBE_DROP;
1521 }
1522
1523 static gboolean
resend_sticky(GstPad * pad,GstEvent ** event,GstPad * peer)1524 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1525 {
1526 return gst_pad_send_event (peer, gst_event_ref (*event));
1527 }
1528
1529 static void
unlock_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1530 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1531 {
1532 if (ctx->fragment_block_id > 0) {
1533 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1534 ctx->fragment_block_id = 0;
1535 }
1536 }
1537
1538 static void
restart_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1539 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1540 {
1541 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1542
1543 gst_pad_sticky_events_foreach (ctx->srcpad,
1544 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1545
1546 /* Clear EOS flag if not actually EOS */
1547 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1548 ctx->out_eos_async_done = ctx->out_eos;
1549
1550 gst_object_unref (peer);
1551 }
1552
1553 static void
relink_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1554 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1555 {
1556 GstPad *sinkpad, *srcpad, *newpad;
1557 GstPadTemplate *templ;
1558
1559 srcpad = ctx->srcpad;
1560 sinkpad = gst_pad_get_peer (srcpad);
1561
1562 templ = sinkpad->padtemplate;
1563 newpad =
1564 gst_element_request_pad (splitmux->muxer, templ,
1565 GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1566
1567 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1568 newpad);
1569 if (!gst_pad_unlink (srcpad, sinkpad)) {
1570 gst_object_unref (sinkpad);
1571 goto fail;
1572 }
1573 if (gst_pad_link_full (srcpad, newpad,
1574 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1575 gst_element_release_request_pad (splitmux->muxer, newpad);
1576 gst_object_unref (sinkpad);
1577 gst_object_unref (newpad);
1578 goto fail;
1579 }
1580 gst_object_unref (newpad);
1581 gst_object_unref (sinkpad);
1582 return;
1583
1584 fail:
1585 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1586 ("Could not create the new muxer/sink"), NULL);
1587 }
1588
1589 static GstPadProbeReturn
_block_pad(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1590 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1591 {
1592 return GST_PAD_PROBE_OK;
1593 }
1594
1595 static void
block_context(MqStreamCtx * ctx,GstSplitMuxSink * splitmux)1596 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1597 {
1598 ctx->fragment_block_id =
1599 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1600 NULL, NULL);
1601 }
1602
1603 static gboolean
_set_property_from_structure(GQuark field_id,const GValue * value,gpointer user_data)1604 _set_property_from_structure (GQuark field_id, const GValue * value,
1605 gpointer user_data)
1606 {
1607 const gchar *property_name = g_quark_to_string (field_id);
1608 GObject *element = G_OBJECT (user_data);
1609
1610 g_object_set_property (element, property_name, value);
1611
1612 return TRUE;
1613 }
1614
1615 static void
_lock_and_set_to_null(GstElement * element,GstSplitMuxSink * splitmux)1616 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1617 {
1618 gst_element_set_locked_state (element, TRUE);
1619 gst_element_set_state (element, GST_STATE_NULL);
1620 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1621 gst_bin_remove (GST_BIN (splitmux), element);
1622 }
1623
1624
1625 static void
_send_event(const GValue * value,gpointer user_data)1626 _send_event (const GValue * value, gpointer user_data)
1627 {
1628 GstPad *pad = g_value_get_object (value);
1629 GstEvent *ev = user_data;
1630
1631 gst_pad_send_event (pad, gst_event_ref (ev));
1632 }
1633
1634 /* Called with lock held when a fragment
1635 * reaches EOS and it is time to restart
1636 * a new fragment
1637 */
1638 static void
start_next_fragment(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)1639 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1640 {
1641 GstElement *muxer, *sink;
1642
1643 g_assert (ctx->is_reference);
1644
1645 /* 1 change to new file */
1646 splitmux->switching_fragment = TRUE;
1647
1648 /* We need to drop the splitmux lock to acquire the state lock
1649 * here and ensure there's no racy state change going on elsewhere */
1650 muxer = gst_object_ref (splitmux->muxer);
1651 sink = gst_object_ref (splitmux->active_sink);
1652
1653 GST_SPLITMUX_UNLOCK (splitmux);
1654 GST_STATE_LOCK (splitmux);
1655
1656 if (splitmux->async_finalize) {
1657 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1658 gchar *newname;
1659 GstElement *new_sink, *new_muxer;
1660
1661 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1662 splitmux->fragment_id);
1663 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1664 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1665 GST_SPLITMUX_LOCK (splitmux);
1666 if ((splitmux->sink =
1667 create_element (splitmux, splitmux->sink_factory, newname,
1668 TRUE)) == NULL)
1669 goto fail;
1670 if (splitmux->sink_properties)
1671 gst_structure_foreach (splitmux->sink_properties,
1672 _set_property_from_structure, splitmux->sink);
1673 splitmux->active_sink = splitmux->sink;
1674 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1675 g_free (newname);
1676 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1677 if ((splitmux->muxer =
1678 create_element (splitmux, splitmux->muxer_factory, newname,
1679 TRUE)) == NULL)
1680 goto fail;
1681 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1682 "async") != NULL) {
1683 /* async child elements are causing state change races and weird
1684 * failures, so let's try and turn that off */
1685 g_object_set (splitmux->sink, "async", FALSE, NULL);
1686 }
1687 if (splitmux->muxer_properties)
1688 gst_structure_foreach (splitmux->muxer_properties,
1689 _set_property_from_structure, splitmux->muxer);
1690 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1691 g_free (newname);
1692 new_sink = splitmux->sink;
1693 new_muxer = splitmux->muxer;
1694 GST_SPLITMUX_UNLOCK (splitmux);
1695 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1696 gst_element_link (new_muxer, new_sink);
1697
1698 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1699 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1700 EOS_FROM_US)) == 2) {
1701 _lock_and_set_to_null (muxer, splitmux);
1702 _lock_and_set_to_null (sink, splitmux);
1703 } else {
1704 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1705 GINT_TO_POINTER (2));
1706 }
1707 }
1708 gst_object_unref (muxer);
1709 gst_object_unref (sink);
1710 muxer = new_muxer;
1711 sink = new_sink;
1712 gst_object_ref (muxer);
1713 gst_object_ref (sink);
1714 }
1715 } else {
1716
1717 gst_element_set_locked_state (muxer, TRUE);
1718 gst_element_set_locked_state (sink, TRUE);
1719 gst_element_set_state (sink, GST_STATE_NULL);
1720
1721 if (splitmux->reset_muxer) {
1722 gst_element_set_state (muxer, GST_STATE_NULL);
1723 } else {
1724 GstIterator *it = gst_element_iterate_sink_pads (muxer);
1725 GstEvent *ev;
1726
1727 ev = gst_event_new_flush_start ();
1728 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1729 gst_event_unref (ev);
1730
1731 gst_iterator_resync (it);
1732
1733 ev = gst_event_new_flush_stop (TRUE);
1734 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1735 gst_event_unref (ev);
1736
1737 gst_iterator_free (it);
1738 }
1739 }
1740
1741 GST_SPLITMUX_LOCK (splitmux);
1742 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1743 set_next_filename (splitmux, ctx);
1744 splitmux->muxed_out_bytes = 0;
1745 GST_SPLITMUX_UNLOCK (splitmux);
1746
1747 gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1748 gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1749 gst_element_set_locked_state (muxer, FALSE);
1750 gst_element_set_locked_state (sink, FALSE);
1751
1752 gst_object_unref (sink);
1753 gst_object_unref (muxer);
1754
1755 GST_SPLITMUX_LOCK (splitmux);
1756 GST_STATE_UNLOCK (splitmux);
1757 splitmux->switching_fragment = FALSE;
1758 do_async_done (splitmux);
1759
1760 splitmux->ready_for_output = TRUE;
1761
1762 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1763 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1764
1765 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1766
1767 /* FIXME: Is this always the correct next state? */
1768 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1769 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1770 return;
1771
1772 fail:
1773 GST_STATE_UNLOCK (splitmux);
1774 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1775 ("Could not create the new muxer/sink"), NULL);
1776 }
1777
1778 static void
bus_handler(GstBin * bin,GstMessage * message)1779 bus_handler (GstBin * bin, GstMessage * message)
1780 {
1781 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1782
1783 switch (GST_MESSAGE_TYPE (message)) {
1784 case GST_MESSAGE_EOS:{
1785 /* If the state is draining out the current file, drop this EOS */
1786 GstElement *sink;
1787
1788 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1789 GST_SPLITMUX_LOCK (splitmux);
1790
1791 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1792
1793 if (splitmux->async_finalize) {
1794
1795 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1796 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1797 EOS_FROM_US)) == 2) {
1798 GstElement *muxer;
1799 GstPad *sinksink, *muxersrc;
1800
1801 sinksink = gst_element_get_static_pad (sink, "sink");
1802 muxersrc = gst_pad_get_peer (sinksink);
1803 muxer = gst_pad_get_parent_element (muxersrc);
1804 gst_object_unref (sinksink);
1805 gst_object_unref (muxersrc);
1806
1807 gst_element_call_async (muxer,
1808 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1809 gst_object_ref (splitmux), gst_object_unref);
1810 gst_element_call_async (sink,
1811 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1812 gst_object_ref (splitmux), gst_object_unref);
1813 gst_object_unref (muxer);
1814 } else {
1815 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1816 GINT_TO_POINTER (2));
1817 }
1818 GST_DEBUG_OBJECT (splitmux,
1819 "Caught async EOS from previous muxer+sink. Dropping.");
1820 /* We forward the EOS so that it gets aggregated as normal. If the sink
1821 * finishes and is removed before the end, it will be de-aggregated */
1822 gst_message_unref (message);
1823 GST_SPLITMUX_UNLOCK (splitmux);
1824 return;
1825 }
1826 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1827 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1828 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1829 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1830
1831 gst_message_unref (message);
1832 GST_SPLITMUX_UNLOCK (splitmux);
1833 return;
1834 } else {
1835 GST_DEBUG_OBJECT (splitmux,
1836 "Passing EOS message. Output state %d max_out_running_time %"
1837 GST_STIME_FORMAT, splitmux->output_state,
1838 GST_STIME_ARGS (splitmux->max_out_running_time));
1839 }
1840 GST_SPLITMUX_UNLOCK (splitmux);
1841 break;
1842 }
1843 case GST_MESSAGE_ASYNC_START:
1844 case GST_MESSAGE_ASYNC_DONE:
1845 /* Ignore state changes from our children while switching */
1846 GST_SPLITMUX_LOCK (splitmux);
1847 if (splitmux->switching_fragment) {
1848 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1849 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1850 GST_LOG_OBJECT (splitmux,
1851 "Ignoring state change from child %" GST_PTR_FORMAT
1852 " while switching", GST_MESSAGE_SRC (message));
1853 gst_message_unref (message);
1854 GST_SPLITMUX_UNLOCK (splitmux);
1855 return;
1856 }
1857 }
1858 GST_SPLITMUX_UNLOCK (splitmux);
1859 break;
1860 case GST_MESSAGE_WARNING:
1861 {
1862 GError *gerror = NULL;
1863
1864 gst_message_parse_warning (message, &gerror, NULL);
1865
1866 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1867 GList *item;
1868 gboolean caps_change = FALSE;
1869
1870 GST_SPLITMUX_LOCK (splitmux);
1871
1872 for (item = splitmux->contexts; item; item = item->next) {
1873 MqStreamCtx *ctx = item->data;
1874
1875 if (ctx->caps_change) {
1876 caps_change = TRUE;
1877 break;
1878 }
1879 }
1880
1881 GST_SPLITMUX_UNLOCK (splitmux);
1882
1883 if (caps_change) {
1884 GST_LOG_OBJECT (splitmux,
1885 "Ignoring warning change from child %" GST_PTR_FORMAT
1886 " while switching caps", GST_MESSAGE_SRC (message));
1887 gst_message_unref (message);
1888 return;
1889 }
1890 }
1891 break;
1892 }
1893 default:
1894 break;
1895 }
1896
1897 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1898 }
1899
1900 static void
ctx_set_unblock(MqStreamCtx * ctx)1901 ctx_set_unblock (MqStreamCtx * ctx)
1902 {
1903 ctx->need_unblock = TRUE;
1904 }
1905
1906 static gboolean
need_new_fragment(GstSplitMuxSink * splitmux,GstClockTime queued_time,GstClockTime queued_gop_time,guint64 queued_bytes)1907 need_new_fragment (GstSplitMuxSink * splitmux,
1908 GstClockTime queued_time, GstClockTime queued_gop_time,
1909 guint64 queued_bytes)
1910 {
1911 guint64 thresh_bytes;
1912 GstClockTime thresh_time;
1913 gboolean check_robust_muxing;
1914 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
1915 GstClockTime *ptr_to_time;
1916
1917 GST_OBJECT_LOCK (splitmux);
1918 thresh_bytes = splitmux->threshold_bytes;
1919 thresh_time = splitmux->threshold_time;
1920 ptr_to_time = (GstClockTime *)
1921 gst_queue_array_peek_head_struct (splitmux->times_to_split);
1922 if (ptr_to_time)
1923 time_to_split = *ptr_to_time;
1924 check_robust_muxing = splitmux->use_robust_muxing
1925 && splitmux->muxer_has_reserved_props;
1926 GST_OBJECT_UNLOCK (splitmux);
1927
1928 /* Have we muxed anything into the new file at all? */
1929 if (splitmux->fragment_total_bytes <= 0)
1930 return FALSE;
1931
1932 /* User told us to split now */
1933 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE)
1934 return TRUE;
1935
1936 /* User told us to split at this running time */
1937 if (splitmux->reference_ctx->in_running_time > time_to_split) {
1938 GST_OBJECT_LOCK (splitmux);
1939 /* Dequeue running time */
1940 gst_queue_array_pop_head_struct (splitmux->times_to_split);
1941 /* Empty any running times after this that are past now */
1942 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1943 while (ptr_to_time) {
1944 time_to_split = *ptr_to_time;
1945 if (splitmux->reference_ctx->in_running_time <= time_to_split) {
1946 break;
1947 }
1948 gst_queue_array_pop_head_struct (splitmux->times_to_split);
1949 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1950 }
1951 GST_OBJECT_UNLOCK (splitmux);
1952 return TRUE;
1953 }
1954
1955 if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1956 return TRUE; /* Would overrun byte limit */
1957
1958 if (thresh_time > 0 && queued_time > thresh_time)
1959 return TRUE; /* Would overrun byte limit */
1960
1961 /* Timecode-based threshold accounts for possible rounding errors:
1962 * 5us should be bigger than all possible rounding errors but nowhere near
1963 * big enough to skip to another frame */
1964 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1965 splitmux->reference_ctx->in_running_time >
1966 splitmux->next_max_tc_time + 5 * GST_USECOND)
1967 return TRUE; /* Timecode threshold */
1968
1969 if (check_robust_muxing) {
1970 GstClockTime mux_reserved_remain;
1971
1972 g_object_get (splitmux->muxer,
1973 "reserved-duration-remaining", &mux_reserved_remain, NULL);
1974
1975 GST_LOG_OBJECT (splitmux,
1976 "Muxer robust muxing report - %" G_GUINT64_FORMAT
1977 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1978 mux_reserved_remain, queued_gop_time);
1979
1980 if (queued_gop_time >= mux_reserved_remain) {
1981 GST_INFO_OBJECT (splitmux,
1982 "File is about to run out of header room - %" G_GUINT64_FORMAT
1983 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1984 ". Switching to new file", mux_reserved_remain, queued_gop_time);
1985 return TRUE;
1986 }
1987 }
1988
1989 /* Continue and mux this GOP */
1990 return FALSE;
1991 }
1992
1993 /* Called with splitmux lock held */
1994 /* Called when entering ProcessingCompleteGop state
1995 * Assess if mq contents overflowed the current file
1996 * -> If yes, need to switch to new file
1997 * -> if no, set max_out_running_time to let this GOP in and
1998 * go to COLLECTING_GOP_START state
1999 */
2000 static void
handle_gathered_gop(GstSplitMuxSink * splitmux)2001 handle_gathered_gop (GstSplitMuxSink * splitmux)
2002 {
2003 guint64 queued_bytes;
2004 GstClockTimeDiff queued_time = 0;
2005 GstClockTimeDiff queued_gop_time = 0;
2006 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
2007 SplitMuxOutputCommand *cmd;
2008
2009 /* Assess if the multiqueue contents overflowed the current file */
2010 /* When considering if a newly gathered GOP overflows
2011 * the time limit for the file, only consider the running time of the
2012 * reference stream. Other streams might have run ahead a little bit,
2013 * but extra pieces won't be released to the muxer beyond the reference
2014 * stream cut-off anyway - so it forms the limit. */
2015 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
2016 queued_time = splitmux->reference_ctx->in_running_time;
2017 /* queued_gop_time tracks how much unwritten data there is waiting to
2018 * be written to this fragment including this GOP */
2019 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2020 queued_gop_time =
2021 splitmux->reference_ctx->in_running_time -
2022 splitmux->reference_ctx->out_running_time;
2023 else
2024 queued_gop_time =
2025 splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2026
2027 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2028
2029 g_assert (queued_gop_time >= 0);
2030 g_assert (queued_time >= splitmux->fragment_start_time);
2031
2032 queued_time -= splitmux->fragment_start_time;
2033 if (queued_time < queued_gop_time)
2034 queued_gop_time = queued_time;
2035
2036 /* Expand queued bytes estimate by muxer overhead */
2037 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2038
2039 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2040 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
2041 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
2042 GST_LOG_OBJECT (splitmux,
2043 "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
2044 GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
2045 GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
2046 }
2047
2048 /* Check for overrun - have we output at least one byte and overrun
2049 * either threshold? */
2050 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2051 if (splitmux->async_finalize) {
2052 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2053 *sink_running_time = splitmux->reference_ctx->out_running_time;
2054 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2055 RUNNING_TIME, sink_running_time, g_free);
2056 }
2057 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2058 /* Tell the output side to start a new fragment */
2059 GST_INFO_OBJECT (splitmux,
2060 "This GOP (dur %" GST_STIME_FORMAT
2061 ") would overflow the fragment, Sending start_new_fragment cmd",
2062 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2063 splitmux->gop_start_time));
2064 cmd = out_cmd_buf_new ();
2065 cmd->start_new_fragment = TRUE;
2066 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2067 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2068
2069 new_out_ts = splitmux->reference_ctx->in_running_time;
2070 splitmux->fragment_start_time = splitmux->gop_start_time;
2071 splitmux->fragment_total_bytes = 0;
2072
2073 if (request_next_keyframe (splitmux,
2074 splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
2075 GST_WARNING_OBJECT (splitmux,
2076 "Could not request a keyframe. Files may not split at the exact location they should");
2077 }
2078 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2079 }
2080
2081 /* And set up to collect the next GOP */
2082 if (!splitmux->reference_ctx->in_eos) {
2083 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2084 splitmux->gop_start_time = new_out_ts;
2085 } else {
2086 /* This is probably already the current state, but just in case: */
2087 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2088 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
2089 }
2090
2091 /* And wake all input contexts to send a wake-up event */
2092 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2093 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2094
2095 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2096 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2097
2098 if (splitmux->gop_total_bytes > 0) {
2099 GST_LOG_OBJECT (splitmux,
2100 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2101 " time %" GST_STIME_FORMAT,
2102 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2103
2104 /* Send this GOP to the output command queue */
2105 cmd = out_cmd_buf_new ();
2106 cmd->start_new_fragment = FALSE;
2107 cmd->max_output_ts = new_out_ts;
2108 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2109 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2110 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2111
2112 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2113 }
2114
2115 splitmux->gop_total_bytes = 0;
2116 }
2117
2118 /* Called with splitmux lock held */
2119 /* Called from each input pad when it is has all the pieces
2120 * for a GOP or EOS, starting with the reference pad which has set the
2121 * splitmux->max_in_running_time
2122 */
2123 static void
check_completed_gop(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)2124 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2125 {
2126 GList *cur;
2127 GstEvent *event;
2128
2129 /* On ENDING_FILE, the reference stream sends a command to start a new
2130 * fragment, then releases the GOP for output in the new fragment.
2131 * If somes streams received no buffer during the last GOP that overran,
2132 * because its next buffer has a timestamp bigger than
2133 * ctx->max_in_running_time, its queue is empty. In that case the only
2134 * way to wakeup the output thread is by injecting an event in the
2135 * queue. This usually happen with subtitle streams.
2136 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2137 if (ctx->need_unblock) {
2138 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2139 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2140 GST_EVENT_TYPE_SERIALIZED,
2141 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2142 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2143
2144 GST_SPLITMUX_UNLOCK (splitmux);
2145 gst_pad_send_event (ctx->sinkpad, event);
2146 GST_SPLITMUX_LOCK (splitmux);
2147
2148 ctx->need_unblock = FALSE;
2149 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2150 /* state may have changed while we were unlocked. Loop again if so */
2151 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2152 return;
2153 }
2154
2155 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2156 gboolean ready = TRUE;
2157
2158 /* Iterate each pad, and check that the input running time is at least
2159 * up to the reference running time, and if so handle the collected GOP */
2160 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2161 GST_STIME_FORMAT " ctx %p",
2162 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2163 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2164 cur = g_list_next (cur)) {
2165 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2166
2167 GST_LOG_OBJECT (splitmux,
2168 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2169 " EOS %d", tmpctx, tmpctx->srcpad,
2170 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2171
2172 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2173 tmpctx->in_running_time < splitmux->max_in_running_time &&
2174 !tmpctx->in_eos) {
2175 GST_LOG_OBJECT (splitmux,
2176 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2177 tmpctx, tmpctx->srcpad);
2178 ready = FALSE;
2179 break;
2180 }
2181 }
2182 if (ready) {
2183 GST_DEBUG_OBJECT (splitmux,
2184 "Collected GOP is complete. Processing (ctx %p)", ctx);
2185 /* All pads have a complete GOP, release it into the multiqueue */
2186 handle_gathered_gop (splitmux);
2187
2188 /* The user has requested a split, we can split now that the previous GOP
2189 * has been collected to the correct location */
2190 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE,
2191 FALSE)) {
2192 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2193 }
2194 }
2195 }
2196
2197 /* If upstream reached EOS we are not expecting more data, no need to wait
2198 * here. */
2199 if (ctx->in_eos)
2200 return;
2201
2202 /* Some pad is not yet ready, or GOP is being pushed
2203 * either way, sleep and wait to get woken */
2204 while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2205 !ctx->flushing &&
2206 (ctx->in_running_time >= splitmux->max_in_running_time) &&
2207 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2208
2209 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2210 GST_SPLITMUX_WAIT_INPUT (splitmux);
2211 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2212 }
2213 }
2214
2215 static GstPadProbeReturn
handle_mq_input(GstPad * pad,GstPadProbeInfo * info,MqStreamCtx * ctx)2216 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2217 {
2218 GstSplitMuxSink *splitmux = ctx->splitmux;
2219 GstBuffer *buf;
2220 MqStreamBuf *buf_info = NULL;
2221 GstClockTime ts;
2222 gboolean loop_again;
2223 gboolean keyframe = FALSE;
2224
2225 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2226
2227 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2228 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2229 g_warning ("Buffer list handling not implemented");
2230 return GST_PAD_PROBE_DROP;
2231 }
2232 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2233 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2234 GstEvent *event = gst_pad_probe_info_get_event (info);
2235
2236 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2237
2238 switch (GST_EVENT_TYPE (event)) {
2239 case GST_EVENT_SEGMENT:
2240 gst_event_copy_segment (event, &ctx->in_segment);
2241 break;
2242 case GST_EVENT_FLUSH_STOP:
2243 GST_SPLITMUX_LOCK (splitmux);
2244 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2245 ctx->in_eos = FALSE;
2246 /* ohos.ext.func.0008
2247 * when GST_EVENT_FLUSH_STOP, ctx->in_running_time is set to none.
2248 * when GST_EVENT_EOS called after GST_EVENT_FLUSH_STOP, codes goes to handle_gathered_gop in
2249 * check_completed_gop. queued_gop_time is always below zero, and it failed.
2250 */
2251 #ifndef OHOS_EXT_FUNC
2252 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2253 #endif
2254 GST_SPLITMUX_UNLOCK (splitmux);
2255 break;
2256 case GST_EVENT_EOS:
2257 GST_SPLITMUX_LOCK (splitmux);
2258 ctx->in_eos = TRUE;
2259
2260 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2261 goto beach;
2262
2263 if (ctx->is_reference) {
2264 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2265 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2266 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2267 /* Wake up other input pads to collect this GOP */
2268 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2269 check_completed_gop (splitmux, ctx);
2270 } else if (splitmux->input_state ==
2271 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2272 /* If we are waiting for a GOP to be completed (ie, for aux
2273 * pads to catch up), then this pad is complete, so check
2274 * if the whole GOP is.
2275 */
2276 check_completed_gop (splitmux, ctx);
2277 }
2278 GST_SPLITMUX_UNLOCK (splitmux);
2279 break;
2280 case GST_EVENT_GAP:{
2281 GstClockTime gap_ts;
2282 GstClockTimeDiff rtime;
2283
2284 gst_event_parse_gap (event, &gap_ts, NULL);
2285 if (gap_ts == GST_CLOCK_TIME_NONE)
2286 break;
2287
2288 GST_SPLITMUX_LOCK (splitmux);
2289
2290 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2291 goto beach;
2292 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2293
2294 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2295 GST_STIME_ARGS (rtime));
2296
2297 if (ctx->is_reference
2298 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2299 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2300 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2301 GST_STIME_ARGS (splitmux->fragment_start_time));
2302 /* Also take this as the first start time when starting up,
2303 * so that we start counting overflow from the first frame */
2304 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2305 splitmux->max_in_running_time = splitmux->fragment_start_time;
2306 }
2307
2308 GST_SPLITMUX_UNLOCK (splitmux);
2309 break;
2310 }
2311 default:
2312 break;
2313 }
2314 return GST_PAD_PROBE_PASS;
2315 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2316 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2317 case GST_QUERY_ALLOCATION:
2318 return GST_PAD_PROBE_DROP;
2319 default:
2320 return GST_PAD_PROBE_PASS;
2321 }
2322 }
2323
2324 buf = gst_pad_probe_info_get_buffer (info);
2325 buf_info = mq_stream_buf_new ();
2326
2327 if (GST_BUFFER_PTS_IS_VALID (buf))
2328 ts = GST_BUFFER_PTS (buf);
2329 else
2330 ts = GST_BUFFER_DTS (buf);
2331
2332 #ifdef OHOS_OPT_COMPAT
2333 // ohos.opt.compat.0018
2334 // to avoid when buffer take too many time in src,
2335 // cause pst comes to clock none.
2336 GST_BUFFER_PTS (buf) = ts;
2337 GST_BUFFER_DTS (buf) = ts;
2338 #endif
2339
2340 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2341
2342 GST_SPLITMUX_LOCK (splitmux);
2343
2344 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2345 goto beach;
2346
2347 /* If this buffer has a timestamp, advance the input timestamp of the
2348 * stream */
2349 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2350 GstClockTimeDiff running_time =
2351 my_segment_to_running_time (&ctx->in_segment, ts);
2352
2353 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2354 GST_STIME_ARGS (running_time));
2355
2356 if (GST_CLOCK_STIME_IS_VALID (running_time)
2357 && running_time > ctx->in_running_time)
2358 ctx->in_running_time = running_time;
2359 }
2360
2361 /* Try to make sure we have a valid running time */
2362 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2363 ctx->in_running_time =
2364 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2365 }
2366
2367 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2368 GST_STIME_ARGS (ctx->in_running_time));
2369
2370 buf_info->run_ts = ctx->in_running_time;
2371 buf_info->buf_size = gst_buffer_get_size (buf);
2372 buf_info->duration = GST_BUFFER_DURATION (buf);
2373
2374 /* initialize fragment_start_time */
2375 if (ctx->is_reference
2376 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2377 splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2378 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2379 GST_STIME_ARGS (splitmux->fragment_start_time));
2380 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2381
2382 /* Also take this as the first start time when starting up,
2383 * so that we start counting overflow from the first frame */
2384 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2385 splitmux->max_in_running_time = splitmux->fragment_start_time;
2386 if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2387 GST_WARNING_OBJECT (splitmux,
2388 "Could not request a keyframe. Files may not split at the exact location they should");
2389 }
2390 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2391 }
2392
2393 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2394 " total GOP bytes %" G_GUINT64_FORMAT,
2395 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2396
2397 loop_again = TRUE;
2398 do {
2399 if (ctx->flushing)
2400 break;
2401
2402 switch (splitmux->input_state) {
2403 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2404 if (ctx->is_reference) {
2405 /* This is the reference context. If it's a keyframe,
2406 * it marks the start of a new GOP and we should wait in
2407 * check_completed_gop before continuing, but either way
2408 * (keyframe or no, we'll pass this buffer through after
2409 * so set loop_again to FALSE */
2410 loop_again = FALSE;
2411
2412 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2413 /* Allow other input pads to catch up to here too */
2414 splitmux->max_in_running_time = ctx->in_running_time;
2415 GST_LOG_OBJECT (splitmux,
2416 "Max in running time now %" GST_TIME_FORMAT,
2417 GST_TIME_ARGS (splitmux->max_in_running_time));
2418 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2419 break;
2420 }
2421 GST_INFO_OBJECT (pad,
2422 "Have keyframe with running time %" GST_STIME_FORMAT,
2423 GST_STIME_ARGS (ctx->in_running_time));
2424 keyframe = TRUE;
2425 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2426 splitmux->max_in_running_time = ctx->in_running_time;
2427 GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2428 GST_TIME_ARGS (splitmux->max_in_running_time));
2429 /* Wake up other input pads to collect this GOP */
2430 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2431 check_completed_gop (splitmux, ctx);
2432 /* Store this new keyframe to remember the start of GOP */
2433 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2434 } else {
2435 /* Pass this buffer if the reference ctx is far enough ahead */
2436 if (ctx->in_running_time < splitmux->max_in_running_time) {
2437 loop_again = FALSE;
2438 break;
2439 }
2440
2441 /* We're still waiting for a keyframe on the reference pad, sleep */
2442 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2443 GST_SPLITMUX_WAIT_INPUT (splitmux);
2444 GST_LOG_OBJECT (pad,
2445 "Done sleeping for GOP start input state now %d",
2446 splitmux->input_state);
2447 }
2448 break;
2449 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2450 /* We're collecting a GOP. If this is the reference context,
2451 * we need to check if this is a keyframe that marks the start
2452 * of the next GOP. If it is, it marks the end of the GOP we're
2453 * collecting, so sleep and wait until all the other pads also
2454 * reach that timestamp - at which point, we have an entire GOP
2455 * and either go to ENDING_FILE or release this GOP to the muxer and
2456 * go back to COLLECT_GOP_START. */
2457
2458 /* If we overran the target timestamp, it might be time to process
2459 * the GOP, otherwise bail out for more data
2460 */
2461 GST_LOG_OBJECT (pad,
2462 "Checking TS %" GST_STIME_FORMAT " against max %"
2463 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2464 GST_STIME_ARGS (splitmux->max_in_running_time));
2465
2466 if (ctx->in_running_time < splitmux->max_in_running_time) {
2467 loop_again = FALSE;
2468 break;
2469 }
2470
2471 GST_LOG_OBJECT (pad,
2472 "Collected last packet of GOP. Checking other pads");
2473 check_completed_gop (splitmux, ctx);
2474 break;
2475 }
2476 case SPLITMUX_INPUT_STATE_FINISHING_UP:
2477 loop_again = FALSE;
2478 break;
2479 default:
2480 loop_again = FALSE;
2481 break;
2482 }
2483 }
2484 while (loop_again);
2485
2486 if (keyframe) {
2487 splitmux->queued_keyframes++;
2488 buf_info->keyframe = TRUE;
2489 }
2490
2491 /* Update total input byte counter for overflow detect */
2492 splitmux->gop_total_bytes += buf_info->buf_size;
2493
2494 /* Now add this buffer to the queue just before returning */
2495 g_queue_push_head (&ctx->queued_bufs, buf_info);
2496
2497 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2498 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2499
2500 GST_SPLITMUX_UNLOCK (splitmux);
2501 return GST_PAD_PROBE_PASS;
2502
2503 beach:
2504 GST_SPLITMUX_UNLOCK (splitmux);
2505 if (buf_info)
2506 mq_stream_buf_free (buf_info);
2507 return GST_PAD_PROBE_PASS;
2508 }
2509
2510 static void
grow_blocked_queues(GstSplitMuxSink * splitmux)2511 grow_blocked_queues (GstSplitMuxSink * splitmux)
2512 {
2513 GList *cur;
2514
2515 /* Scan other queues for full-ness and grow them */
2516 for (cur = g_list_first (splitmux->contexts);
2517 cur != NULL; cur = g_list_next (cur)) {
2518 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2519 guint cur_limit;
2520 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2521
2522 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2523 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2524
2525 if (cur_len >= cur_limit) {
2526 cur_limit = cur_len + 1;
2527 GST_DEBUG_OBJECT (tmpctx->q,
2528 "Queue overflowed and needs enlarging. Growing to %u buffers",
2529 cur_limit);
2530 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2531 }
2532 }
2533 }
2534
2535 static void
handle_q_underrun(GstElement * q,gpointer user_data)2536 handle_q_underrun (GstElement * q, gpointer user_data)
2537 {
2538 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2539 GstSplitMuxSink *splitmux = ctx->splitmux;
2540
2541 GST_SPLITMUX_LOCK (splitmux);
2542 GST_DEBUG_OBJECT (q,
2543 "Queue reported underrun with %d keyframes and %d cmds enqueued",
2544 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2545 grow_blocked_queues (splitmux);
2546 GST_SPLITMUX_UNLOCK (splitmux);
2547 }
2548
2549 static void
handle_q_overrun(GstElement * q,gpointer user_data)2550 handle_q_overrun (GstElement * q, gpointer user_data)
2551 {
2552 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2553 GstSplitMuxSink *splitmux = ctx->splitmux;
2554 gboolean allow_grow = FALSE;
2555
2556 GST_SPLITMUX_LOCK (splitmux);
2557 GST_DEBUG_OBJECT (q,
2558 "Queue reported overrun with %d keyframes and %d cmds enqueued",
2559 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2560
2561 if (splitmux->queued_keyframes < 2) {
2562 /* Less than a full GOP queued, grow the queue */
2563 allow_grow = TRUE;
2564 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2565 allow_grow = TRUE;
2566 } else {
2567 /* If another queue is starved, grow */
2568 GList *cur;
2569 for (cur = g_list_first (splitmux->contexts);
2570 cur != NULL; cur = g_list_next (cur)) {
2571 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2572 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2573 allow_grow = TRUE;
2574 }
2575 }
2576 }
2577 GST_SPLITMUX_UNLOCK (splitmux);
2578
2579 if (allow_grow) {
2580 guint cur_limit;
2581
2582 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2583 cur_limit++;
2584
2585 GST_DEBUG_OBJECT (q,
2586 "Queue overflowed and needs enlarging. Growing to %u buffers",
2587 cur_limit);
2588
2589 g_object_set (q, "max-size-buffers", cur_limit, NULL);
2590 }
2591 }
2592
2593 static GstPad *
gst_splitmux_sink_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)2594 gst_splitmux_sink_request_new_pad (GstElement * element,
2595 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2596 {
2597 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2598 GstPadTemplate *mux_template = NULL;
2599 GstPad *res = NULL;
2600 GstElement *q;
2601 GstPad *q_sink = NULL, *q_src = NULL;
2602 gchar *gname;
2603 gboolean is_video = FALSE;
2604 MqStreamCtx *ctx;
2605
2606 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2607
2608 GST_SPLITMUX_LOCK (splitmux);
2609 if (!create_muxer (splitmux))
2610 goto fail;
2611 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2612
2613 if (templ->name_template) {
2614 if (g_str_equal (templ->name_template, "video")) {
2615 if (splitmux->have_video)
2616 goto already_have_video;
2617
2618 /* FIXME: Look for a pad template with matching caps, rather than by name */
2619 GST_DEBUG_OBJECT (element,
2620 "searching for pad-template with name 'video_%%u'");
2621 mux_template =
2622 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2623 (splitmux->muxer), "video_%u");
2624
2625 /* Fallback to find sink pad templates named 'video' (flvmux) */
2626 if (!mux_template) {
2627 GST_DEBUG_OBJECT (element,
2628 "searching for pad-template with name 'video'");
2629 mux_template =
2630 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2631 (splitmux->muxer), "video");
2632 }
2633 is_video = TRUE;
2634 name = NULL;
2635 } else {
2636 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
2637 templ->name_template);
2638 mux_template =
2639 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2640 (splitmux->muxer), templ->name_template);
2641
2642 /* Fallback to find sink pad templates named 'audio' (flvmux) */
2643 if (!mux_template) {
2644 GST_DEBUG_OBJECT (element,
2645 "searching for pad-template with name 'audio'");
2646 mux_template =
2647 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2648 (splitmux->muxer), "audio");
2649 name = NULL;
2650 }
2651 }
2652
2653 if (mux_template == NULL) {
2654 GST_DEBUG_OBJECT (element,
2655 "searching for pad-template with name 'sink_%%d'");
2656 mux_template =
2657 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2658 (splitmux->muxer), "sink_%d");
2659 name = NULL;
2660 }
2661 if (mux_template == NULL) {
2662 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
2663 mux_template =
2664 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2665 (splitmux->muxer), "sink");
2666 name = NULL;
2667 }
2668 }
2669
2670 if (mux_template == NULL) {
2671 GST_ERROR_OBJECT (element,
2672 "unable to find a suitable sink pad-template on the muxer");
2673
2674 goto fail;
2675 }
2676 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
2677 mux_template->name_template);
2678
2679 if (mux_template->presence == GST_PAD_REQUEST) {
2680 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
2681
2682 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2683 if (res == NULL)
2684 goto fail;
2685 } else if (mux_template->presence == GST_PAD_ALWAYS) {
2686 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
2687
2688 res =
2689 gst_element_get_static_pad (splitmux->muxer,
2690 mux_template->name_template);
2691 if (res == NULL)
2692 goto fail;
2693 } else {
2694 GST_ERROR_OBJECT (element,
2695 "unexpected pad presence %d", mux_template->presence);
2696
2697 goto fail;
2698 }
2699
2700 if (is_video)
2701 gname = g_strdup ("video");
2702 else if (name == NULL)
2703 gname = gst_pad_get_name (res);
2704 else
2705 gname = g_strdup (name);
2706
2707 if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2708 goto fail;
2709
2710 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2711
2712 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2713 "max-size-buffers", 5, NULL);
2714
2715 q_sink = gst_element_get_static_pad (q, "sink");
2716 q_src = gst_element_get_static_pad (q, "src");
2717
2718 if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2719 gst_element_release_request_pad (splitmux->muxer, res);
2720 gst_object_unref (GST_OBJECT (res));
2721 goto fail;
2722 }
2723
2724 gst_object_unref (GST_OBJECT (res));
2725
2726 ctx = mq_stream_ctx_new (splitmux);
2727 /* Context holds a ref: */
2728 ctx->q = gst_object_ref (q);
2729 ctx->srcpad = q_src;
2730 ctx->sinkpad = q_sink;
2731 ctx->q_overrun_id =
2732 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2733 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2734
2735 ctx->src_pad_block_id =
2736 gst_pad_add_probe (q_src,
2737 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2738 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2739 if (is_video && splitmux->reference_ctx != NULL) {
2740 splitmux->reference_ctx->is_reference = FALSE;
2741 splitmux->reference_ctx = NULL;
2742 }
2743 if (splitmux->reference_ctx == NULL) {
2744 splitmux->reference_ctx = ctx;
2745 ctx->is_reference = TRUE;
2746 }
2747
2748 res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2749 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2750
2751 ctx->sink_pad_block_id =
2752 gst_pad_add_probe (q_sink,
2753 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2754 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2755 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2756
2757 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2758 " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2759
2760 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2761
2762 g_free (gname);
2763
2764 if (is_video)
2765 splitmux->have_video = TRUE;
2766
2767 gst_pad_set_active (res, TRUE);
2768 gst_element_add_pad (element, res);
2769
2770 GST_SPLITMUX_UNLOCK (splitmux);
2771
2772 return res;
2773 fail:
2774 GST_SPLITMUX_UNLOCK (splitmux);
2775
2776 if (q_sink)
2777 gst_object_unref (q_sink);
2778 if (q_src)
2779 gst_object_unref (q_src);
2780 return NULL;
2781 already_have_video:
2782 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2783 GST_SPLITMUX_UNLOCK (splitmux);
2784 return NULL;
2785 }
2786
2787 static void
gst_splitmux_sink_release_pad(GstElement * element,GstPad * pad)2788 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2789 {
2790 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2791 GstPad *muxpad = NULL;
2792 MqStreamCtx *ctx =
2793 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2794
2795 GST_SPLITMUX_LOCK (splitmux);
2796
2797 if (splitmux->muxer == NULL)
2798 goto fail; /* Elements don't exist yet - nothing to release */
2799
2800 GST_INFO_OBJECT (pad, "releasing request pad");
2801
2802 muxpad = gst_pad_get_peer (ctx->srcpad);
2803
2804 /* Remove the context from our consideration */
2805 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2806
2807 if (ctx->sink_pad_block_id)
2808 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2809
2810 if (ctx->src_pad_block_id)
2811 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2812
2813 /* Can release the context now */
2814 mq_stream_ctx_free (ctx);
2815 if (ctx == splitmux->reference_ctx)
2816 splitmux->reference_ctx = NULL;
2817
2818 /* Release and free the muxer input */
2819 if (muxpad) {
2820 gst_element_release_request_pad (splitmux->muxer, muxpad);
2821 gst_object_unref (muxpad);
2822 }
2823
2824 if (GST_PAD_PAD_TEMPLATE (pad) &&
2825 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2826 (pad)), "video"))
2827 splitmux->have_video = FALSE;
2828
2829 gst_element_remove_pad (element, pad);
2830
2831 /* Reset the internal elements only after all request pads are released */
2832 if (splitmux->contexts == NULL)
2833 gst_splitmux_reset (splitmux);
2834
2835 fail:
2836 GST_SPLITMUX_UNLOCK (splitmux);
2837 }
2838
2839 static GstElement *
create_element(GstSplitMuxSink * splitmux,const gchar * factory,const gchar * name,gboolean locked)2840 create_element (GstSplitMuxSink * splitmux,
2841 const gchar * factory, const gchar * name, gboolean locked)
2842 {
2843 GstElement *ret = gst_element_factory_make (factory, name);
2844 if (ret == NULL) {
2845 g_warning ("Failed to create %s - splitmuxsink will not work", name);
2846 return NULL;
2847 }
2848
2849 if (locked) {
2850 /* Ensure the sink starts in locked state and NULL - it will be changed
2851 * by the filename setting code */
2852 gst_element_set_locked_state (ret, TRUE);
2853 gst_element_set_state (ret, GST_STATE_NULL);
2854 }
2855
2856 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2857 g_warning ("Could not add %s element - splitmuxsink will not work", name);
2858 gst_object_unref (ret);
2859 return NULL;
2860 }
2861
2862 return ret;
2863 }
2864
2865 static gboolean
create_muxer(GstSplitMuxSink * splitmux)2866 create_muxer (GstSplitMuxSink * splitmux)
2867 {
2868 /* Create internal elements */
2869 if (splitmux->muxer == NULL) {
2870 GstElement *provided_muxer = NULL;
2871
2872 GST_OBJECT_LOCK (splitmux);
2873 if (splitmux->provided_muxer != NULL)
2874 provided_muxer = gst_object_ref (splitmux->provided_muxer);
2875 GST_OBJECT_UNLOCK (splitmux);
2876
2877 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2878 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2879 if ((splitmux->muxer =
2880 create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2881 goto fail;
2882 } else if (splitmux->async_finalize) {
2883 if ((splitmux->muxer =
2884 create_element (splitmux, splitmux->muxer_factory, "muxer",
2885 FALSE)) == NULL)
2886 goto fail;
2887 if (splitmux->muxer_properties)
2888 gst_structure_foreach (splitmux->muxer_properties,
2889 _set_property_from_structure, splitmux->muxer);
2890 } else {
2891 /* Ensure it's not in locked state (we might be reusing an old element) */
2892 gst_element_set_locked_state (provided_muxer, FALSE);
2893 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2894 g_warning ("Could not add muxer element - splitmuxsink will not work");
2895 gst_object_unref (provided_muxer);
2896 goto fail;
2897 }
2898
2899 splitmux->muxer = provided_muxer;
2900 gst_object_unref (provided_muxer);
2901 }
2902
2903 if (splitmux->use_robust_muxing) {
2904 update_muxer_properties (splitmux);
2905 }
2906 }
2907
2908 return TRUE;
2909 fail:
2910 return FALSE;
2911 }
2912
2913 static GstElement *
find_sink(GstElement * e)2914 find_sink (GstElement * e)
2915 {
2916 GstElement *res = NULL;
2917 GstIterator *iter;
2918 gboolean done = FALSE;
2919 GValue data = { 0, };
2920
2921 if (!GST_IS_BIN (e))
2922 return e;
2923
2924 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2925 return e;
2926
2927 iter = gst_bin_iterate_sinks (GST_BIN (e));
2928 while (!done) {
2929 switch (gst_iterator_next (iter, &data)) {
2930 case GST_ITERATOR_OK:
2931 {
2932 GstElement *child = g_value_get_object (&data);
2933 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2934 "location") != NULL) {
2935 res = child;
2936 done = TRUE;
2937 }
2938 g_value_reset (&data);
2939 break;
2940 }
2941 case GST_ITERATOR_RESYNC:
2942 gst_iterator_resync (iter);
2943 break;
2944 case GST_ITERATOR_DONE:
2945 done = TRUE;
2946 break;
2947 case GST_ITERATOR_ERROR:
2948 g_assert_not_reached ();
2949 break;
2950 }
2951 }
2952 g_value_unset (&data);
2953 gst_iterator_free (iter);
2954
2955 return res;
2956 }
2957
2958 static gboolean
create_sink(GstSplitMuxSink * splitmux)2959 create_sink (GstSplitMuxSink * splitmux)
2960 {
2961 GstElement *provided_sink = NULL;
2962
2963 if (splitmux->active_sink == NULL) {
2964
2965 GST_OBJECT_LOCK (splitmux);
2966 if (splitmux->provided_sink != NULL)
2967 provided_sink = gst_object_ref (splitmux->provided_sink);
2968 GST_OBJECT_UNLOCK (splitmux);
2969
2970 if ((!splitmux->async_finalize && provided_sink == NULL) ||
2971 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2972 if ((splitmux->sink =
2973 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2974 goto fail;
2975 splitmux->active_sink = splitmux->sink;
2976 } else if (splitmux->async_finalize) {
2977 if ((splitmux->sink =
2978 create_element (splitmux, splitmux->sink_factory, "sink",
2979 TRUE)) == NULL)
2980 goto fail;
2981 if (splitmux->sink_properties)
2982 gst_structure_foreach (splitmux->sink_properties,
2983 _set_property_from_structure, splitmux->sink);
2984 splitmux->active_sink = splitmux->sink;
2985 } else {
2986 /* Ensure the sink starts in locked state and NULL - it will be changed
2987 * by the filename setting code */
2988 gst_element_set_locked_state (provided_sink, TRUE);
2989 gst_element_set_state (provided_sink, GST_STATE_NULL);
2990 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2991 g_warning ("Could not add sink elements - splitmuxsink will not work");
2992 gst_object_unref (provided_sink);
2993 goto fail;
2994 }
2995
2996 splitmux->active_sink = provided_sink;
2997
2998 /* The bin holds a ref now, we can drop our tmp ref */
2999 gst_object_unref (provided_sink);
3000
3001 /* Find the sink element */
3002 splitmux->sink = find_sink (splitmux->active_sink);
3003 if (splitmux->sink == NULL) {
3004 g_warning
3005 ("Could not locate sink element in provided sink - splitmuxsink will not work");
3006 goto fail;
3007 }
3008 }
3009
3010 #if 1
3011 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3012 "async") != NULL) {
3013 /* async child elements are causing state change races and weird
3014 * failures, so let's try and turn that off */
3015 g_object_set (splitmux->sink, "async", FALSE, NULL);
3016 }
3017 #endif
3018
3019 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3020 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3021 goto fail;
3022 }
3023 }
3024
3025 return TRUE;
3026 fail:
3027 return FALSE;
3028 }
3029
3030 #ifdef __GNUC__
3031 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3032 #endif
3033 static void
set_next_filename(GstSplitMuxSink * splitmux,MqStreamCtx * ctx)3034 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3035 {
3036 gchar *fname = NULL;
3037 GstSample *sample;
3038 GstCaps *caps;
3039
3040 gst_splitmux_sink_ensure_max_files (splitmux);
3041
3042 if (ctx->cur_out_buffer == NULL) {
3043 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3044 }
3045
3046 caps = gst_pad_get_current_caps (ctx->srcpad);
3047 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3048 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3049 splitmux->fragment_id, sample, &fname);
3050 gst_sample_unref (sample);
3051 if (caps)
3052 gst_caps_unref (caps);
3053
3054 if (fname == NULL) {
3055 /* Fallback to the old signal if the new one returned nothing */
3056 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3057 splitmux->fragment_id, &fname);
3058 }
3059
3060 if (!fname)
3061 fname = splitmux->location ?
3062 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3063
3064 if (fname) {
3065 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3066 g_object_set (splitmux->sink, "location", fname, NULL);
3067 g_free (fname);
3068
3069 splitmux->fragment_id++;
3070 }
3071 }
3072
3073 static void
do_async_start(GstSplitMuxSink * splitmux)3074 do_async_start (GstSplitMuxSink * splitmux)
3075 {
3076 GstMessage *message;
3077
3078 if (!splitmux->need_async_start) {
3079 GST_INFO_OBJECT (splitmux, "no async_start needed");
3080 return;
3081 }
3082
3083 splitmux->async_pending = TRUE;
3084
3085 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3086 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3087 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3088 (splitmux), message);
3089 }
3090
3091 static void
do_async_done(GstSplitMuxSink * splitmux)3092 do_async_done (GstSplitMuxSink * splitmux)
3093 {
3094 GstMessage *message;
3095
3096 if (splitmux->async_pending) {
3097 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3098 message =
3099 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3100 GST_CLOCK_TIME_NONE);
3101 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3102 (splitmux), message);
3103
3104 splitmux->async_pending = FALSE;
3105 }
3106
3107 splitmux->need_async_start = FALSE;
3108 }
3109
3110 static GstStateChangeReturn
gst_splitmux_sink_change_state(GstElement * element,GstStateChange transition)3111 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3112 {
3113 GstStateChangeReturn ret;
3114 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3115
3116 switch (transition) {
3117 case GST_STATE_CHANGE_NULL_TO_READY:{
3118 GST_SPLITMUX_LOCK (splitmux);
3119 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3120 ret = GST_STATE_CHANGE_FAILURE;
3121 GST_SPLITMUX_UNLOCK (splitmux);
3122 goto beach;
3123 }
3124 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3125 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3126 GST_SPLITMUX_UNLOCK (splitmux);
3127 splitmux->fragment_id = 0;
3128 break;
3129 }
3130 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3131 GST_SPLITMUX_LOCK (splitmux);
3132 /* Start by collecting one input on each pad */
3133 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3134 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3135 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3136 splitmux->gop_start_time = splitmux->fragment_start_time =
3137 GST_CLOCK_STIME_NONE;
3138 splitmux->muxed_out_bytes = 0;
3139 splitmux->ready_for_output = FALSE;
3140 GST_SPLITMUX_UNLOCK (splitmux);
3141 break;
3142 }
3143 case GST_STATE_CHANGE_PAUSED_TO_READY:
3144 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3145 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3146 case GST_STATE_CHANGE_READY_TO_NULL:
3147 GST_SPLITMUX_LOCK (splitmux);
3148 gst_queue_array_clear (splitmux->times_to_split);
3149 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3150 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3151 /* Wake up any blocked threads */
3152 GST_LOG_OBJECT (splitmux,
3153 "State change -> NULL or READY. Waking threads");
3154 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3155 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3156 GST_SPLITMUX_UNLOCK (splitmux);
3157 break;
3158 default:
3159 break;
3160 }
3161
3162 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3163 if (ret == GST_STATE_CHANGE_FAILURE)
3164 goto beach;
3165
3166 switch (transition) {
3167 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3168 splitmux->need_async_start = TRUE;
3169 break;
3170 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3171 /* Change state async, because our child sink might not
3172 * be ready to do that for us yet if it's state is still locked */
3173
3174 splitmux->need_async_start = TRUE;
3175 /* we want to go async to PAUSED until we managed to configure and add the
3176 * sink */
3177 GST_SPLITMUX_LOCK (splitmux);
3178 do_async_start (splitmux);
3179 GST_SPLITMUX_UNLOCK (splitmux);
3180 ret = GST_STATE_CHANGE_ASYNC;
3181 break;
3182 }
3183 case GST_STATE_CHANGE_READY_TO_NULL:
3184 GST_SPLITMUX_LOCK (splitmux);
3185 splitmux->fragment_id = 0;
3186 /* Reset internal elements only if no pad contexts are using them */
3187 if (splitmux->contexts == NULL)
3188 gst_splitmux_reset (splitmux);
3189 do_async_done (splitmux);
3190 GST_SPLITMUX_UNLOCK (splitmux);
3191 break;
3192 default:
3193 break;
3194 }
3195
3196 beach:
3197 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3198 ret == GST_STATE_CHANGE_FAILURE) {
3199 /* Cleanup elements on failed transition out of NULL */
3200 gst_splitmux_reset (splitmux);
3201 GST_SPLITMUX_LOCK (splitmux);
3202 do_async_done (splitmux);
3203 GST_SPLITMUX_UNLOCK (splitmux);
3204 }
3205 return ret;
3206 }
3207
3208 gboolean
register_splitmuxsink(GstPlugin * plugin)3209 register_splitmuxsink (GstPlugin * plugin)
3210 {
3211 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3212 "Split File Muxing Sink");
3213
3214 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3215 GST_TYPE_SPLITMUX_SINK);
3216 }
3217
3218 static void
gst_splitmux_sink_ensure_max_files(GstSplitMuxSink * splitmux)3219 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3220 {
3221 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3222 splitmux->fragment_id = 0;
3223 }
3224 }
3225
3226 static void
split_now(GstSplitMuxSink * splitmux)3227 split_now (GstSplitMuxSink * splitmux)
3228 {
3229 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3230 }
3231
3232 static void
split_after(GstSplitMuxSink * splitmux)3233 split_after (GstSplitMuxSink * splitmux)
3234 {
3235 g_atomic_int_set (&(splitmux->split_requested), TRUE);
3236 }
3237
3238 static void
split_at_running_time(GstSplitMuxSink * splitmux,GstClockTime split_time)3239 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3240 {
3241 gboolean send_keyframe_requests;
3242
3243 GST_SPLITMUX_LOCK (splitmux);
3244 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3245 send_keyframe_requests = splitmux->send_keyframe_requests;
3246 GST_SPLITMUX_UNLOCK (splitmux);
3247
3248 if (send_keyframe_requests) {
3249 GstEvent *ev =
3250 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3251 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3252 GST_TIME_ARGS (split_time));
3253 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3254 GST_WARNING_OBJECT (splitmux,
3255 "Could not request keyframe at %" GST_TIME_FORMAT,
3256 GST_TIME_ARGS (split_time));
3257 }
3258 }
3259 }
3260