1 /* GStreamer Split Demuxer bin that recombines files created by
2 * the splitmuxsink element.
3 *
4 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39
40 #define SPLITMUX_PART_MSG_LOCK(p) g_mutex_lock(&(p)->msg_lock)
41 #define SPLITMUX_PART_MSG_UNLOCK(p) g_mutex_unlock(&(p)->msg_lock)
42
43 typedef struct _GstSplitMuxPartPad
44 {
45 GstPad parent;
46
47 /* Reader we belong to */
48 GstSplitMuxPartReader *reader;
49 /* Output splitmuxsrc source pad */
50 GstPad *target;
51
52 GstDataQueue *queue;
53
54 gboolean is_eos;
55 gboolean flushing;
56 gboolean seen_buffer;
57
58 gboolean is_sparse;
59 GstClockTime max_ts;
60 GstSegment segment;
61
62 GstSegment orig_segment;
63 GstClockTime initial_ts_offset;
64 } GstSplitMuxPartPad;
65
66 typedef struct _GstSplitMuxPartPadClass
67 {
68 GstPadClass parent;
69 } GstSplitMuxPartPadClass;
70
71 static GType gst_splitmux_part_pad_get_type (void);
72 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
73 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
74
75 static void splitmux_part_pad_constructed (GObject * pad);
76 static void splitmux_part_pad_finalize (GObject * pad);
77 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
78 GstSplitMuxPartPad * part_pad, GstBuffer * buf);
79
80 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
81 guint visible, guint bytes, guint64 time, gpointer checkdata);
82 static void type_found (GstElement * typefind, guint probability,
83 GstCaps * caps, GstSplitMuxPartReader * reader);
84 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
85
86 static void
87 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
88 reader);
89
90 /* Called with reader lock held */
91 static gboolean
have_empty_queue(GstSplitMuxPartReader * reader)92 have_empty_queue (GstSplitMuxPartReader * reader)
93 {
94 GList *cur;
95
96 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
97 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
98 if (part_pad->is_eos) {
99 GST_LOG_OBJECT (part_pad, "Pad is EOS");
100 return TRUE;
101 }
102 if (gst_data_queue_is_empty (part_pad->queue)) {
103 GST_LOG_OBJECT (part_pad, "Queue is empty");
104 return TRUE;
105 }
106 }
107
108 return FALSE;
109 }
110
111 /* Called with reader lock held */
112 static gboolean
block_until_can_push(GstSplitMuxPartReader * reader)113 block_until_can_push (GstSplitMuxPartReader * reader)
114 {
115 while (reader->running) {
116 if (reader->flushing)
117 goto out;
118 if (reader->active && have_empty_queue (reader))
119 goto out;
120
121 GST_LOG_OBJECT (reader,
122 "Waiting for activation or empty queue on reader %s", reader->path);
123 SPLITMUX_PART_WAIT (reader);
124 }
125
126 GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
127 reader->path, reader->active, reader->flushing);
128 out:
129 return reader->active && !reader->flushing;
130 }
131
132 static void
handle_buffer_measuring(GstSplitMuxPartReader * reader,GstSplitMuxPartPad * part_pad,GstBuffer * buf)133 handle_buffer_measuring (GstSplitMuxPartReader * reader,
134 GstSplitMuxPartPad * part_pad, GstBuffer * buf)
135 {
136 GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
137 GstClockTimeDiff offset;
138
139 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
140 !part_pad->seen_buffer) {
141 /* If this is the first buffer on the pad in the collect_streams state,
142 * then calculate initial offset based on running time of this segment */
143 part_pad->initial_ts_offset =
144 part_pad->orig_segment.start + part_pad->orig_segment.base -
145 part_pad->orig_segment.time;
146 GST_DEBUG_OBJECT (reader,
147 "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
148 part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
149 }
150 part_pad->seen_buffer = TRUE;
151
152 /* Adjust buffer timestamps */
153 offset = reader->start_offset + part_pad->segment.base;
154 offset -= part_pad->initial_ts_offset;
155 /* We don't add the ts_offset here, because we
156 * want to measure the logical length of the stream,
157 * not to generate output timestamps */
158
159 /* Update the stored max duration on the pad,
160 * always preferring making DTS contiguous
161 * where possible */
162 if (GST_BUFFER_DTS_IS_VALID (buf))
163 ts = GST_BUFFER_DTS (buf) + offset;
164 else if (GST_BUFFER_PTS_IS_VALID (buf))
165 ts = GST_BUFFER_PTS (buf) + offset;
166
167 GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
168 " incoming DTS %" GST_TIME_FORMAT
169 " PTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
170 " to %" GST_STIME_FORMAT, part_pad,
171 GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
172 GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
173 GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
174
175 if (GST_CLOCK_STIME_IS_VALID (ts)) {
176 if (GST_BUFFER_DURATION_IS_VALID (buf))
177 ts += GST_BUFFER_DURATION (buf);
178
179 if (GST_CLOCK_STIME_IS_VALID (ts)
180 && ts > (GstClockTimeDiff) part_pad->max_ts) {
181 part_pad->max_ts = ts;
182 GST_LOG_OBJECT (reader,
183 "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
184 GST_TIME_ARGS (part_pad->max_ts));
185 }
186 }
187 /* Is it time to move to measuring state yet? */
188 check_if_pads_collected (reader);
189 }
190
191 static gboolean
splitmux_data_queue_is_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer checkdata)192 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
193 guint visible, guint bytes, guint64 time, gpointer checkdata)
194 {
195 /* Arbitrary safety limit. If we hit it, playback is likely to stall */
196 if (time > 20 * GST_SECOND)
197 return TRUE;
198 return FALSE;
199 }
200
201 static void
splitmux_part_free_queue_item(GstDataQueueItem * item)202 splitmux_part_free_queue_item (GstDataQueueItem * item)
203 {
204 gst_mini_object_unref (item->object);
205 g_slice_free (GstDataQueueItem, item);
206 }
207
208 static GstFlowReturn
splitmux_part_pad_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)209 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
210 {
211 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
212 GstSplitMuxPartReader *reader = part_pad->reader;
213 GstDataQueueItem *item;
214 GstClockTimeDiff offset;
215
216 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
217 SPLITMUX_PART_LOCK (reader);
218
219 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
220 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
221 handle_buffer_measuring (reader, part_pad, buf);
222 gst_buffer_unref (buf);
223 SPLITMUX_PART_UNLOCK (reader);
224 return GST_FLOW_OK;
225 }
226
227 if (!block_until_can_push (reader)) {
228 /* Flushing */
229 SPLITMUX_PART_UNLOCK (reader);
230 gst_buffer_unref (buf);
231 return GST_FLOW_FLUSHING;
232 }
233
234 /* Adjust buffer timestamps */
235 offset = reader->start_offset + part_pad->segment.base;
236 offset -= part_pad->initial_ts_offset;
237 offset += reader->ts_offset;
238
239 if (GST_BUFFER_PTS_IS_VALID (buf))
240 GST_BUFFER_PTS (buf) += offset;
241 if (GST_BUFFER_DTS_IS_VALID (buf))
242 GST_BUFFER_DTS (buf) += offset;
243
244 /* We are active, and one queue is empty, place this buffer in
245 * the dataqueue */
246 GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
247 item = g_slice_new (GstDataQueueItem);
248 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
249 item->object = GST_MINI_OBJECT (buf);
250 item->size = gst_buffer_get_size (buf);
251 item->duration = GST_BUFFER_DURATION (buf);
252 if (item->duration == GST_CLOCK_TIME_NONE)
253 item->duration = 0;
254 item->visible = TRUE;
255
256 gst_object_ref (part_pad);
257
258 SPLITMUX_PART_UNLOCK (reader);
259
260 if (!gst_data_queue_push (part_pad->queue, item)) {
261 splitmux_part_free_queue_item (item);
262 gst_object_unref (part_pad);
263 return GST_FLOW_FLUSHING;
264 }
265
266 gst_object_unref (part_pad);
267 return GST_FLOW_OK;
268 }
269
270 /* Called with splitmux part lock held */
271 static gboolean
splitmux_part_is_eos_locked(GstSplitMuxPartReader * part)272 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
273 {
274 GList *cur;
275 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
276 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
277 if (!part_pad->is_eos)
278 return FALSE;
279 }
280
281 return TRUE;
282 }
283
284 static gboolean
splitmux_part_is_prerolled_locked(GstSplitMuxPartReader * part)285 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
286 {
287 GList *cur;
288 GST_LOG_OBJECT (part, "Checking for preroll");
289 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
290 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
291 if (!part_pad->seen_buffer) {
292 GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
293 part_pad);
294 return FALSE;
295 }
296 }
297 GST_LOG_OBJECT (part, "Part is prerolled");
298 return TRUE;
299 }
300
301
302 gboolean
gst_splitmux_part_is_eos(GstSplitMuxPartReader * reader)303 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
304 {
305 gboolean res;
306
307 SPLITMUX_PART_LOCK (reader);
308 res = splitmux_part_is_eos_locked (reader);
309 SPLITMUX_PART_UNLOCK (reader);
310
311 return res;
312 }
313
314 /* Called with splitmux part lock held */
315 static gboolean
splitmux_is_flushing(GstSplitMuxPartReader * reader)316 splitmux_is_flushing (GstSplitMuxPartReader * reader)
317 {
318 GList *cur;
319 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
320 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
321 if (part_pad->flushing)
322 return TRUE;
323 }
324
325 return FALSE;
326 }
327
328 static gboolean
splitmux_part_pad_event(GstPad * pad,GstObject * parent,GstEvent * event)329 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
330 {
331 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
332 GstSplitMuxPartReader *reader = part_pad->reader;
333 gboolean ret = TRUE;
334 SplitMuxSrcPad *target;
335 GstDataQueueItem *item;
336
337 SPLITMUX_PART_LOCK (reader);
338
339 target = gst_object_ref (part_pad->target);
340
341 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
342 event);
343
344 if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
345 goto drop_event;
346
347 switch (GST_EVENT_TYPE (event)) {
348 case GST_EVENT_STREAM_START:{
349 GstStreamFlags flags;
350 gst_event_parse_stream_flags (event, &flags);
351 part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
352 break;
353 }
354 case GST_EVENT_SEGMENT:{
355 GstSegment *seg = &part_pad->segment;
356
357 GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
358
359 gst_event_copy_segment (event, seg);
360 gst_event_copy_segment (event, &part_pad->orig_segment);
361
362 if (seg->format != GST_FORMAT_TIME)
363 goto wrong_segment;
364
365 /* Adjust segment */
366 /* Adjust start/stop so the overall file is 0 + start_offset based,
367 * adding a fixed offset so that DTS is never negative */
368 if (seg->stop != -1) {
369 seg->stop -= seg->start;
370 seg->stop += seg->time + reader->start_offset + reader->ts_offset;
371 }
372 seg->start = seg->time + reader->start_offset + reader->ts_offset;
373 seg->time += reader->start_offset;
374 seg->position += reader->start_offset;
375
376 /* Replace event */
377 gst_event_unref (event);
378 event = gst_event_new_segment (seg);
379
380 GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
381
382 if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
383 && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
384 break; /* Only do further stuff with segments during initial measuring */
385
386 /* Take the first segment from the first part */
387 if (target->segment.format == GST_FORMAT_UNDEFINED) {
388 gst_segment_copy_into (seg, &target->segment);
389 GST_DEBUG_OBJECT (reader,
390 "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
391 }
392
393 if (seg->stop != -1 && target->segment.stop != -1) {
394 GstClockTime stop = seg->base + seg->stop;
395 if (stop > target->segment.stop) {
396 target->segment.stop = stop;
397 GST_DEBUG_OBJECT (reader,
398 "Adjusting segment stop by %" GST_TIME_FORMAT
399 " output now %" GST_SEGMENT_FORMAT,
400 GST_TIME_ARGS (reader->start_offset), &target->segment);
401 }
402 }
403 GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
404 break;
405 }
406 case GST_EVENT_EOS:{
407
408 GST_DEBUG_OBJECT (part_pad,
409 "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
410 reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
411
412 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
413 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
414 /* Mark this pad as EOS */
415 part_pad->is_eos = TRUE;
416 if (splitmux_part_is_eos_locked (reader)) {
417 /* Finished measuring things, set state and tell the state change func
418 * so it can seek back to the start */
419 GST_LOG_OBJECT (reader,
420 "EOS while measuring streams. Resetting for ready");
421 reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
422
423 gst_element_call_async (GST_ELEMENT_CAST (reader),
424 (GstElementCallAsyncFunc)
425 gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
426 }
427 goto drop_event;
428 }
429 break;
430 }
431 case GST_EVENT_FLUSH_START:
432 reader->flushing = TRUE;
433 part_pad->flushing = TRUE;
434 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
435 part_pad);
436 gst_data_queue_set_flushing (part_pad->queue, TRUE);
437 SPLITMUX_PART_BROADCAST (reader);
438 break;
439 case GST_EVENT_FLUSH_STOP:{
440 gst_data_queue_set_flushing (part_pad->queue, FALSE);
441 gst_data_queue_flush (part_pad->queue);
442 part_pad->seen_buffer = FALSE;
443 part_pad->flushing = FALSE;
444 part_pad->is_eos = FALSE;
445
446 reader->flushing = splitmux_is_flushing (reader);
447 GST_LOG_OBJECT (reader,
448 "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
449 reader->path, pad, reader->flushing);
450 SPLITMUX_PART_BROADCAST (reader);
451 break;
452 }
453 default:
454 break;
455 }
456
457 /* Don't send events downstream while preparing */
458 if (reader->prep_state != PART_STATE_READY)
459 goto drop_event;
460
461 /* Don't pass flush events - those are done by the parent */
462 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
463 GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
464 goto drop_event;
465
466 if (!block_until_can_push (reader))
467 goto drop_event;
468
469 switch (GST_EVENT_TYPE (event)) {
470 case GST_EVENT_GAP:{
471 /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
472 goto drop_event;
473 }
474 default:
475 break;
476 }
477
478 /* We are active, and one queue is empty, place this buffer in
479 * the dataqueue */
480 gst_object_ref (part_pad->queue);
481 SPLITMUX_PART_UNLOCK (reader);
482
483 GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
484 item = g_slice_new (GstDataQueueItem);
485 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
486 item->object = GST_MINI_OBJECT (event);
487 item->size = 0;
488 item->duration = 0;
489 if (item->duration == GST_CLOCK_TIME_NONE)
490 item->duration = 0;
491 item->visible = FALSE;
492
493 if (!gst_data_queue_push (part_pad->queue, item)) {
494 splitmux_part_free_queue_item (item);
495 ret = FALSE;
496 }
497
498 gst_object_unref (part_pad->queue);
499 gst_object_unref (target);
500
501 return ret;
502 wrong_segment:
503 gst_event_unref (event);
504 gst_object_unref (target);
505 SPLITMUX_PART_UNLOCK (reader);
506 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
507 ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
508 reader->path, pad));
509 return FALSE;
510 drop_event:
511 GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
512 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
513 gst_event_unref (event);
514 gst_object_unref (target);
515 SPLITMUX_PART_UNLOCK (reader);
516 return TRUE;
517 }
518
519 static gboolean
splitmux_part_pad_query(GstPad * pad,GstObject * parent,GstQuery * query)520 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
521 {
522 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
523 GstSplitMuxPartReader *reader = part_pad->reader;
524 GstPad *target;
525 gboolean ret = FALSE;
526 gboolean active;
527
528 SPLITMUX_PART_LOCK (reader);
529 target = gst_object_ref (part_pad->target);
530 active = reader->active;
531 SPLITMUX_PART_UNLOCK (reader);
532
533 if (active) {
534 GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
535 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
536
537 ret = gst_pad_query (target, query);
538 }
539
540 gst_object_unref (target);
541
542 return ret;
543 }
544
545 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
546
547 static void
splitmux_part_pad_constructed(GObject * pad)548 splitmux_part_pad_constructed (GObject * pad)
549 {
550 gst_pad_set_chain_function (GST_PAD (pad),
551 GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
552 gst_pad_set_event_function (GST_PAD (pad),
553 GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
554 gst_pad_set_query_function (GST_PAD (pad),
555 GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
556
557 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
558 }
559
560 static void
gst_splitmux_part_pad_class_init(GstSplitMuxPartPadClass * klass)561 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
562 {
563 GObjectClass *gobject_klass = (GObjectClass *) (klass);
564
565 gobject_klass->constructed = splitmux_part_pad_constructed;
566 gobject_klass->finalize = splitmux_part_pad_finalize;
567 }
568
569 static void
gst_splitmux_part_pad_init(GstSplitMuxPartPad * pad)570 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
571 {
572 pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
573 NULL, NULL, pad);
574 gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
575 gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
576 }
577
578 static void
splitmux_part_pad_finalize(GObject * obj)579 splitmux_part_pad_finalize (GObject * obj)
580 {
581 GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
582
583 GST_DEBUG_OBJECT (obj, "finalize");
584 gst_data_queue_set_flushing (pad->queue, TRUE);
585 gst_data_queue_flush (pad->queue);
586 gst_object_unref (GST_OBJECT_CAST (pad->queue));
587 pad->queue = NULL;
588
589 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
590 }
591
592 static void
593 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
594 GstSplitMuxPartReader * part);
595 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
596 static GstStateChangeReturn
597 gst_splitmux_part_reader_change_state (GstElement * element,
598 GstStateChange transition);
599 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
600 GstEvent * event);
601 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
602 * part, gboolean flushing);
603 static void bus_handler (GstBin * bin, GstMessage * msg);
604 static void splitmux_part_reader_dispose (GObject * object);
605 static void splitmux_part_reader_finalize (GObject * object);
606 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
607
608 #define gst_splitmux_part_reader_parent_class parent_class
609 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
610 GST_TYPE_PIPELINE);
611
612 static void
gst_splitmux_part_reader_class_init(GstSplitMuxPartReaderClass * klass)613 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
614 {
615 GObjectClass *gobject_klass = (GObjectClass *) (klass);
616 GstElementClass *gstelement_class = (GstElementClass *) klass;
617 GstBinClass *gstbin_class = (GstBinClass *) klass;
618
619 GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
620 "Split File Demuxing Source helper");
621
622 gobject_klass->dispose = splitmux_part_reader_dispose;
623 gobject_klass->finalize = splitmux_part_reader_finalize;
624
625 gstelement_class->change_state = gst_splitmux_part_reader_change_state;
626 gstelement_class->send_event = gst_splitmux_part_reader_send_event;
627
628 gstbin_class->handle_message = bus_handler;
629 }
630
631 static void
gst_splitmux_part_reader_init(GstSplitMuxPartReader * reader)632 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
633 {
634 GstElement *typefind;
635
636 reader->active = FALSE;
637 reader->duration = GST_CLOCK_TIME_NONE;
638
639 g_cond_init (&reader->inactive_cond);
640 g_mutex_init (&reader->lock);
641 g_mutex_init (&reader->type_lock);
642 g_mutex_init (&reader->msg_lock);
643
644 /* FIXME: Create elements on a state change */
645 reader->src = gst_element_factory_make ("filesrc", NULL);
646 if (reader->src == NULL) {
647 GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
648 return;
649 }
650 gst_bin_add (GST_BIN_CAST (reader), reader->src);
651
652 typefind = gst_element_factory_make ("typefind", NULL);
653 if (!typefind) {
654 GST_ERROR_OBJECT (reader,
655 "Failed to create typefind element - check your installation");
656 return;
657 }
658
659 gst_bin_add (GST_BIN_CAST (reader), typefind);
660 reader->typefind = typefind;
661
662 if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
663 GST_ERROR_OBJECT (reader,
664 "Failed to link typefind element - check your installation");
665 return;
666 }
667
668 g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
669 reader);
670 }
671
672 static void
splitmux_part_reader_dispose(GObject * object)673 splitmux_part_reader_dispose (GObject * object)
674 {
675 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
676
677 splitmux_part_reader_reset (reader);
678
679 G_OBJECT_CLASS (parent_class)->dispose (object);
680 }
681
682 static void
splitmux_part_reader_finalize(GObject * object)683 splitmux_part_reader_finalize (GObject * object)
684 {
685 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
686
687 g_cond_clear (&reader->inactive_cond);
688 g_mutex_clear (&reader->lock);
689 g_mutex_clear (&reader->type_lock);
690 g_mutex_clear (&reader->msg_lock);
691
692 g_free (reader->path);
693
694 G_OBJECT_CLASS (parent_class)->finalize (object);
695 }
696
697 static void
do_async_start(GstSplitMuxPartReader * reader)698 do_async_start (GstSplitMuxPartReader * reader)
699 {
700 GstMessage *message;
701
702 SPLITMUX_PART_MSG_LOCK (reader);
703 reader->async_pending = TRUE;
704
705 message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
706 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
707 SPLITMUX_PART_MSG_UNLOCK (reader);
708 }
709
710 static void
do_async_done(GstSplitMuxPartReader * reader)711 do_async_done (GstSplitMuxPartReader * reader)
712 {
713 GstMessage *message;
714
715 SPLITMUX_PART_MSG_LOCK (reader);
716 if (reader->async_pending) {
717 message =
718 gst_message_new_async_done (GST_OBJECT_CAST (reader),
719 GST_CLOCK_TIME_NONE);
720 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
721 message);
722
723 reader->async_pending = FALSE;
724 }
725 SPLITMUX_PART_MSG_UNLOCK (reader);
726 }
727
728 static void
splitmux_part_reader_reset(GstSplitMuxPartReader * reader)729 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
730 {
731 GList *cur;
732
733 SPLITMUX_PART_LOCK (reader);
734 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
735 GstPad *pad = GST_PAD_CAST (cur->data);
736 gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
737 gst_object_unref (GST_OBJECT_CAST (pad));
738 }
739
740 g_list_free (reader->pads);
741 reader->pads = NULL;
742 SPLITMUX_PART_UNLOCK (reader);
743 }
744
745 static GstSplitMuxPartPad *
gst_splitmux_part_reader_new_proxy_pad(GstSplitMuxPartReader * reader,GstPad * target)746 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
747 GstPad * target)
748 {
749 GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
750 "name", GST_PAD_NAME (target),
751 "direction", GST_PAD_SINK,
752 NULL);
753 pad->target = target;
754 pad->reader = reader;
755
756 gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
757
758 return pad;
759 }
760
761 static void
new_decoded_pad_added_cb(GstElement * element,GstPad * pad,GstSplitMuxPartReader * reader)762 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
763 GstSplitMuxPartReader * reader)
764 {
765 GstPad *out_pad = NULL;
766 GstSplitMuxPartPad *proxy_pad;
767 GstCaps *caps;
768 GstPadLinkReturn link_ret;
769
770 caps = gst_pad_get_current_caps (pad);
771
772 GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
773 " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
774
775 gst_caps_unref (caps);
776
777 /* Look up or create the output pad */
778 if (reader->get_pad_cb)
779 out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
780 if (out_pad == NULL) {
781 GST_DEBUG_OBJECT (reader,
782 "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
783 return;
784 }
785
786 /* Create our proxy pad to interact with this new pad */
787 proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
788 GST_DEBUG_OBJECT (reader,
789 "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
790 proxy_pad, out_pad);
791
792 link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
793 if (link_ret != GST_PAD_LINK_OK) {
794 gst_object_unref (proxy_pad);
795 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
796 ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
797 " ret %d", reader->path, pad, link_ret));
798 return;
799 }
800 GST_DEBUG_OBJECT (reader,
801 "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
802 pad, proxy_pad);
803
804 SPLITMUX_PART_LOCK (reader);
805 reader->pads = g_list_prepend (reader->pads, proxy_pad);
806 SPLITMUX_PART_UNLOCK (reader);
807 }
808
809 static gboolean
gst_splitmux_part_reader_send_event(GstElement * element,GstEvent * event)810 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
811 {
812 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
813 gboolean ret = FALSE;
814 GstPad *pad = NULL;
815
816 /* Send event to the first source pad we found */
817 SPLITMUX_PART_LOCK (reader);
818 if (reader->pads) {
819 GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
820 pad = gst_pad_get_peer (proxy_pad);
821 }
822 SPLITMUX_PART_UNLOCK (reader);
823
824 if (pad) {
825 ret = gst_pad_send_event (pad, event);
826 gst_object_unref (pad);
827 } else {
828 gst_event_unref (event);
829 }
830
831 return ret;
832 }
833
834 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
835 static void
gst_splitmux_part_reader_seek_to_time_locked(GstSplitMuxPartReader * reader,GstClockTime time)836 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
837 GstClockTime time)
838 {
839 SPLITMUX_PART_UNLOCK (reader);
840 GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
841 GST_TIME_ARGS (time));
842 gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
843 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
844 GST_SEEK_TYPE_END, 0);
845
846 SPLITMUX_PART_LOCK (reader);
847
848 /* Wait for flush to finish, so old data is gone */
849 while (reader->flushing) {
850 GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
851 SPLITMUX_PART_WAIT (reader);
852 }
853 }
854
855 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
856 static gboolean
gst_splitmux_part_reader_seek_to_segment(GstSplitMuxPartReader * reader,GstSegment * target_seg,GstSeekFlags extra_flags)857 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
858 GstSegment * target_seg, GstSeekFlags extra_flags)
859 {
860 GstSeekFlags flags;
861 GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
862
863 flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
864
865 SPLITMUX_PART_LOCK (reader);
866 if (target_seg->start >= reader->start_offset)
867 start = target_seg->start - reader->start_offset;
868 /* If the segment stop is within this part, don't play to the end */
869 if (target_seg->stop != -1 &&
870 target_seg->stop < reader->start_offset + reader->duration)
871 stop = target_seg->stop - reader->start_offset;
872
873 SPLITMUX_PART_UNLOCK (reader);
874
875 GST_DEBUG_OBJECT (reader,
876 "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
877 GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
878 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
879
880 return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
881 target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
882 stop);
883 }
884
885 /* Called with lock held */
886 static void
gst_splitmux_part_reader_measure_streams(GstSplitMuxPartReader * reader)887 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
888 {
889 SPLITMUX_PART_LOCK (reader);
890 /* Trigger a flushing seek to near the end of the file and run each stream
891 * to EOS in order to find the smallest end timestamp to start the next
892 * file from
893 */
894 if (GST_CLOCK_TIME_IS_VALID (reader->duration)
895 && reader->duration > GST_SECOND) {
896 GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
897 gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
898 }
899 SPLITMUX_PART_UNLOCK (reader);
900 }
901
902 static void
gst_splitmux_part_reader_finish_measuring_streams(GstSplitMuxPartReader * reader)903 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
904 reader)
905 {
906 SPLITMUX_PART_LOCK (reader);
907 if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
908 /* Fire the prepared signal and go to READY state */
909 GST_DEBUG_OBJECT (reader,
910 "Stream measuring complete. File %s is now ready", reader->path);
911 reader->prep_state = PART_STATE_READY;
912 SPLITMUX_PART_UNLOCK (reader);
913 do_async_done (reader);
914 } else {
915 SPLITMUX_PART_UNLOCK (reader);
916 }
917 }
918
919 static GstElement *
find_demuxer(GstCaps * caps)920 find_demuxer (GstCaps * caps)
921 {
922 GList *factories =
923 gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
924 GST_RANK_MARGINAL);
925 GList *compat_elements;
926 GstElement *e = NULL;
927
928 if (factories == NULL)
929 return NULL;
930
931 compat_elements =
932 gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
933
934 if (compat_elements) {
935 /* Just take the first (highest ranked) option */
936 GstElementFactory *factory =
937 GST_ELEMENT_FACTORY_CAST (compat_elements->data);
938 e = gst_element_factory_create (factory, NULL);
939 gst_plugin_feature_list_free (compat_elements);
940 }
941
942 if (factories)
943 gst_plugin_feature_list_free (factories);
944
945 return e;
946 }
947
948 static void
type_found(GstElement * typefind,guint probability,GstCaps * caps,GstSplitMuxPartReader * reader)949 type_found (GstElement * typefind, guint probability,
950 GstCaps * caps, GstSplitMuxPartReader * reader)
951 {
952 GstElement *demux;
953
954 GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
955
956 /* typefind found a type. Look for the demuxer to handle it */
957 demux = reader->demux = find_demuxer (caps);
958 if (reader->demux == NULL) {
959 GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
960 return;
961 }
962
963 /* Connect to demux signals */
964 g_signal_connect (demux,
965 "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
966 g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
967
968 gst_element_set_locked_state (demux, TRUE);
969 gst_bin_add (GST_BIN_CAST (reader), demux);
970 gst_element_link_pads (reader->typefind, "src", demux, NULL);
971 gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
972 gst_element_set_locked_state (demux, FALSE);
973 }
974
975 static void
check_if_pads_collected(GstSplitMuxPartReader * reader)976 check_if_pads_collected (GstSplitMuxPartReader * reader)
977 {
978 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
979 /* Check we have all pads and each pad has seen a buffer */
980 if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
981 GST_DEBUG_OBJECT (reader,
982 "no more pads - file %s. Measuring stream length", reader->path);
983 reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
984 gst_element_call_async (GST_ELEMENT_CAST (reader),
985 (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
986 NULL, NULL);
987 }
988 }
989 }
990
991 static void
no_more_pads(GstElement * element,GstSplitMuxPartReader * reader)992 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
993 {
994 GstClockTime duration = GST_CLOCK_TIME_NONE;
995 GList *cur;
996 /* Query the minimum duration of any pad in this piece and store it.
997 * FIXME: Only consider audio and video */
998 SPLITMUX_PART_LOCK (reader);
999 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1000 GstPad *target = GST_PAD_CAST (cur->data);
1001 if (target) {
1002 gint64 cur_duration;
1003 if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
1004 GST_INFO_OBJECT (reader,
1005 "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
1006 reader->path, target, GST_TIME_ARGS (cur_duration));
1007 if (cur_duration < duration)
1008 duration = cur_duration;
1009 }
1010 }
1011 }
1012 GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
1013 reader->path, GST_TIME_ARGS (duration));
1014 reader->duration = (GstClockTime) duration;
1015
1016 reader->no_more_pads = TRUE;
1017
1018 check_if_pads_collected (reader);
1019 SPLITMUX_PART_UNLOCK (reader);
1020 }
1021
1022 gboolean
gst_splitmux_part_reader_src_query(GstSplitMuxPartReader * part,GstPad * src_pad,GstQuery * query)1023 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
1024 GstPad * src_pad, GstQuery * query)
1025 {
1026 GstPad *target = NULL;
1027 gboolean ret;
1028 GList *cur;
1029
1030 SPLITMUX_PART_LOCK (part);
1031 /* Find the pad corresponding to the visible output target pad */
1032 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
1033 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1034 if (part_pad->target == src_pad) {
1035 target = gst_object_ref (GST_OBJECT_CAST (part_pad));
1036 break;
1037 }
1038 }
1039 SPLITMUX_PART_UNLOCK (part);
1040
1041 if (target == NULL)
1042 return FALSE;
1043
1044 ret = gst_pad_peer_query (target, query);
1045
1046 if (ret == FALSE)
1047 goto out;
1048
1049 /* Post-massaging of queries */
1050 switch (GST_QUERY_TYPE (query)) {
1051 case GST_QUERY_POSITION:{
1052 GstFormat fmt;
1053 gint64 position;
1054
1055 gst_query_parse_position (query, &fmt, &position);
1056 if (fmt != GST_FORMAT_TIME)
1057 return FALSE;
1058 SPLITMUX_PART_LOCK (part);
1059 position += part->start_offset;
1060 GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1061 GST_TIME_ARGS (position));
1062 SPLITMUX_PART_UNLOCK (part);
1063
1064 gst_query_set_position (query, fmt, position);
1065 break;
1066 }
1067 default:
1068 break;
1069 }
1070
1071 out:
1072 gst_object_unref (target);
1073 return ret;
1074 }
1075
1076 static GstStateChangeReturn
gst_splitmux_part_reader_change_state(GstElement * element,GstStateChange transition)1077 gst_splitmux_part_reader_change_state (GstElement * element,
1078 GstStateChange transition)
1079 {
1080 GstStateChangeReturn ret;
1081 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1082
1083 switch (transition) {
1084 case GST_STATE_CHANGE_NULL_TO_READY:{
1085 break;
1086 }
1087 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1088 SPLITMUX_PART_LOCK (reader);
1089 g_object_set (reader->src, "location", reader->path, NULL);
1090 reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1091 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1092 reader->running = TRUE;
1093 SPLITMUX_PART_UNLOCK (reader);
1094
1095 /* we go to PAUSED asynchronously once all streams have been collected
1096 * and seeks to measure the stream lengths are done */
1097 do_async_start (reader);
1098 break;
1099 }
1100 case GST_STATE_CHANGE_READY_TO_NULL:
1101 case GST_STATE_CHANGE_PAUSED_TO_READY:
1102 SPLITMUX_PART_LOCK (reader);
1103 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1104 reader->running = FALSE;
1105 SPLITMUX_PART_BROADCAST (reader);
1106 SPLITMUX_PART_UNLOCK (reader);
1107 break;
1108 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1109 SPLITMUX_PART_LOCK (reader);
1110 reader->active = FALSE;
1111 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1112 SPLITMUX_PART_BROADCAST (reader);
1113 SPLITMUX_PART_UNLOCK (reader);
1114 break;
1115 default:
1116 break;
1117 }
1118
1119 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1120 if (ret == GST_STATE_CHANGE_FAILURE) {
1121 do_async_done (reader);
1122 goto beach;
1123 }
1124
1125 switch (transition) {
1126 case GST_STATE_CHANGE_READY_TO_PAUSED:
1127 ret = GST_STATE_CHANGE_ASYNC;
1128 break;
1129 case GST_STATE_CHANGE_PAUSED_TO_READY:
1130 do_async_done (reader);
1131 break;
1132 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1133 SPLITMUX_PART_LOCK (reader);
1134 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1135 reader->active = TRUE;
1136 SPLITMUX_PART_BROADCAST (reader);
1137 SPLITMUX_PART_UNLOCK (reader);
1138 break;
1139 case GST_STATE_CHANGE_READY_TO_NULL:
1140 reader->prep_state = PART_STATE_NULL;
1141 splitmux_part_reader_reset (reader);
1142 break;
1143 default:
1144 break;
1145 }
1146
1147 beach:
1148 return ret;
1149 }
1150
1151 gboolean
gst_splitmux_part_reader_prepare(GstSplitMuxPartReader * part)1152 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1153 {
1154 GstStateChangeReturn ret;
1155
1156 ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1157
1158 if (ret == GST_STATE_CHANGE_FAILURE)
1159 return FALSE;
1160
1161 return TRUE;
1162 }
1163
1164 void
gst_splitmux_part_reader_unprepare(GstSplitMuxPartReader * part)1165 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1166 {
1167 gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1168 }
1169
1170 void
gst_splitmux_part_reader_set_location(GstSplitMuxPartReader * reader,const gchar * path)1171 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1172 const gchar * path)
1173 {
1174 reader->path = g_strdup (path);
1175 }
1176
1177 gboolean
gst_splitmux_part_reader_activate(GstSplitMuxPartReader * reader,GstSegment * seg,GstSeekFlags extra_flags)1178 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1179 GstSegment * seg, GstSeekFlags extra_flags)
1180 {
1181 GST_DEBUG_OBJECT (reader, "Activating part reader");
1182
1183 if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
1184 GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1185 seg);
1186 return FALSE;
1187 }
1188 if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1189 GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1190 GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1191 return FALSE;
1192 }
1193 return TRUE;
1194 }
1195
1196 gboolean
gst_splitmux_part_reader_is_active(GstSplitMuxPartReader * part)1197 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1198 {
1199 gboolean ret;
1200
1201 SPLITMUX_PART_LOCK (part);
1202 ret = part->active;
1203 SPLITMUX_PART_UNLOCK (part);
1204
1205 return ret;
1206 }
1207
1208 void
gst_splitmux_part_reader_deactivate(GstSplitMuxPartReader * reader)1209 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1210 {
1211 GST_DEBUG_OBJECT (reader, "Deactivating reader");
1212 gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1213 }
1214
1215 void
gst_splitmux_part_reader_set_flushing_locked(GstSplitMuxPartReader * reader,gboolean flushing)1216 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1217 gboolean flushing)
1218 {
1219 GList *cur;
1220
1221 GST_LOG_OBJECT (reader, "%s dataqueues",
1222 flushing ? "Flushing" : "Done flushing");
1223 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1224 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1225 gst_data_queue_set_flushing (part_pad->queue, flushing);
1226 if (flushing)
1227 gst_data_queue_flush (part_pad->queue);
1228 }
1229 };
1230
1231 void
gst_splitmux_part_reader_set_callbacks(GstSplitMuxPartReader * reader,gpointer cb_data,GstSplitMuxPartReaderPadCb get_pad_cb)1232 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1233 gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1234 {
1235 reader->cb_data = cb_data;
1236 reader->get_pad_cb = get_pad_cb;
1237 }
1238
1239 GstClockTime
gst_splitmux_part_reader_get_end_offset(GstSplitMuxPartReader * reader)1240 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1241 {
1242 GList *cur;
1243 GstClockTime ret = GST_CLOCK_TIME_NONE;
1244
1245 SPLITMUX_PART_LOCK (reader);
1246 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1247 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1248 if (!part_pad->is_sparse && part_pad->max_ts < ret)
1249 ret = part_pad->max_ts;
1250 }
1251
1252 SPLITMUX_PART_UNLOCK (reader);
1253
1254 return ret;
1255 }
1256
1257 void
gst_splitmux_part_reader_set_start_offset(GstSplitMuxPartReader * reader,GstClockTime time_offset,GstClockTime ts_offset)1258 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1259 GstClockTime time_offset, GstClockTime ts_offset)
1260 {
1261 SPLITMUX_PART_LOCK (reader);
1262 reader->start_offset = time_offset;
1263 reader->ts_offset = ts_offset;
1264 GST_INFO_OBJECT (reader, "Time offset now %" GST_TIME_FORMAT,
1265 GST_TIME_ARGS (time_offset));
1266 SPLITMUX_PART_UNLOCK (reader);
1267 }
1268
1269 GstClockTime
gst_splitmux_part_reader_get_start_offset(GstSplitMuxPartReader * reader)1270 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1271 {
1272 GstClockTime ret = GST_CLOCK_TIME_NONE;
1273
1274 SPLITMUX_PART_LOCK (reader);
1275 ret = reader->start_offset;
1276 SPLITMUX_PART_UNLOCK (reader);
1277
1278 return ret;
1279 }
1280
1281 GstClockTime
gst_splitmux_part_reader_get_duration(GstSplitMuxPartReader * reader)1282 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1283 {
1284 GstClockTime dur;
1285
1286 SPLITMUX_PART_LOCK (reader);
1287 dur = reader->duration;
1288 SPLITMUX_PART_UNLOCK (reader);
1289
1290 return dur;
1291 }
1292
1293 GstPad *
gst_splitmux_part_reader_lookup_pad(GstSplitMuxPartReader * reader,GstPad * target)1294 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1295 GstPad * target)
1296 {
1297 GstPad *result = NULL;
1298 GList *cur;
1299
1300 SPLITMUX_PART_LOCK (reader);
1301 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1302 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1303 if (part_pad->target == target) {
1304 result = (GstPad *) gst_object_ref (part_pad);
1305 break;
1306 }
1307 }
1308 SPLITMUX_PART_UNLOCK (reader);
1309
1310 return result;
1311 }
1312
1313 GstFlowReturn
gst_splitmux_part_reader_pop(GstSplitMuxPartReader * reader,GstPad * pad,GstDataQueueItem ** item)1314 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1315 GstDataQueueItem ** item)
1316 {
1317 GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1318 GstDataQueue *q;
1319 GstFlowReturn ret;
1320
1321 /* Get one item from the appropriate dataqueue */
1322 SPLITMUX_PART_LOCK (reader);
1323 if (reader->prep_state == PART_STATE_FAILED) {
1324 SPLITMUX_PART_UNLOCK (reader);
1325 return GST_FLOW_ERROR;
1326 }
1327
1328 q = gst_object_ref (part_pad->queue);
1329
1330 /* Have to drop the lock around pop, so we can be woken up for flush */
1331 SPLITMUX_PART_UNLOCK (reader);
1332 if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1333 ret = GST_FLOW_FLUSHING;
1334 goto out;
1335 }
1336
1337 SPLITMUX_PART_LOCK (reader);
1338
1339 SPLITMUX_PART_BROADCAST (reader);
1340 if (GST_IS_EVENT ((*item)->object)) {
1341 GstEvent *e = (GstEvent *) ((*item)->object);
1342 /* Mark this pad as EOS */
1343 if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1344 part_pad->is_eos = TRUE;
1345 }
1346
1347 SPLITMUX_PART_UNLOCK (reader);
1348
1349 ret = GST_FLOW_OK;
1350 out:
1351 gst_object_unref (q);
1352 return ret;
1353 }
1354
1355 static void
bus_handler(GstBin * bin,GstMessage * message)1356 bus_handler (GstBin * bin, GstMessage * message)
1357 {
1358 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1359
1360 switch (GST_MESSAGE_TYPE (message)) {
1361 case GST_MESSAGE_ERROR:
1362 /* Make sure to set the state to failed and wake up the listener
1363 * on error */
1364 SPLITMUX_PART_LOCK (reader);
1365 GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
1366 " marking this reader as failed", GST_MESSAGE_SRC (message));
1367 reader->prep_state = PART_STATE_FAILED;
1368 SPLITMUX_PART_BROADCAST (reader);
1369 SPLITMUX_PART_UNLOCK (reader);
1370 do_async_done (reader);
1371 break;
1372 default:
1373 break;
1374 }
1375
1376 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1377 }
1378