1 /* GStreamer Split Demuxer bin that recombines files created by
2 * the splitmuxsink element.
3 *
4 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39
40 typedef struct _GstSplitMuxPartPad
41 {
42 GstPad parent;
43
44 /* Reader we belong to */
45 GstSplitMuxPartReader *reader;
46 /* Output splitmuxsrc source pad */
47 GstPad *target;
48
49 GstDataQueue *queue;
50
51 gboolean is_eos;
52 gboolean flushing;
53 gboolean seen_buffer;
54
55 gboolean is_sparse;
56 GstClockTime max_ts;
57 GstSegment segment;
58
59 GstSegment orig_segment;
60 GstClockTime initial_ts_offset;
61 } GstSplitMuxPartPad;
62
63 typedef struct _GstSplitMuxPartPadClass
64 {
65 GstPadClass parent;
66 } GstSplitMuxPartPadClass;
67
68 static GType gst_splitmux_part_pad_get_type (void);
69 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
70 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
71
72 static void splitmux_part_pad_constructed (GObject * pad);
73 static void splitmux_part_pad_finalize (GObject * pad);
74 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
75 GstSplitMuxPartPad * part_pad, GstBuffer * buf);
76
77 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
78 guint visible, guint bytes, guint64 time, gpointer checkdata);
79 static void type_found (GstElement * typefind, guint probability,
80 GstCaps * caps, GstSplitMuxPartReader * reader);
81 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
82
83 static void
84 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
85 reader);
86
87 /* Called with reader lock held */
88 static gboolean
have_empty_queue(GstSplitMuxPartReader * reader)89 have_empty_queue (GstSplitMuxPartReader * reader)
90 {
91 GList *cur;
92
93 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
94 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
95 if (part_pad->is_eos) {
96 GST_LOG_OBJECT (part_pad, "Pad is EOS");
97 return TRUE;
98 }
99 if (gst_data_queue_is_empty (part_pad->queue)) {
100 GST_LOG_OBJECT (part_pad, "Queue is empty");
101 return TRUE;
102 }
103 }
104
105 return FALSE;
106 }
107
108 /* Called with reader lock held */
109 static gboolean
block_until_can_push(GstSplitMuxPartReader * reader)110 block_until_can_push (GstSplitMuxPartReader * reader)
111 {
112 while (reader->running) {
113 if (reader->flushing)
114 goto out;
115 if (reader->active && have_empty_queue (reader))
116 goto out;
117
118 GST_LOG_OBJECT (reader,
119 "Waiting for activation or empty queue on reader %s", reader->path);
120 SPLITMUX_PART_WAIT (reader);
121 }
122
123 GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
124 reader->path, reader->active, reader->flushing);
125 out:
126 return reader->active && !reader->flushing;
127 }
128
129 static void
handle_buffer_measuring(GstSplitMuxPartReader * reader,GstSplitMuxPartPad * part_pad,GstBuffer * buf)130 handle_buffer_measuring (GstSplitMuxPartReader * reader,
131 GstSplitMuxPartPad * part_pad, GstBuffer * buf)
132 {
133 GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
134 GstClockTimeDiff offset;
135
136 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
137 !part_pad->seen_buffer) {
138 /* If this is the first buffer on the pad in the collect_streams state,
139 * then calculate inital offset based on running time of this segment */
140 part_pad->initial_ts_offset =
141 part_pad->orig_segment.start + part_pad->orig_segment.base -
142 part_pad->orig_segment.time;
143 GST_DEBUG_OBJECT (reader,
144 "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
145 part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
146 }
147 part_pad->seen_buffer = TRUE;
148
149 /* Adjust buffer timestamps */
150 offset = reader->start_offset + part_pad->segment.base;
151 offset -= part_pad->initial_ts_offset;
152
153 /* Update the stored max duration on the pad,
154 * always preferring making DTS contiguous
155 * where possible */
156 if (GST_BUFFER_DTS_IS_VALID (buf))
157 ts = GST_BUFFER_DTS (buf) + offset;
158 else if (GST_BUFFER_PTS_IS_VALID (buf))
159 ts = GST_BUFFER_PTS (buf) + offset;
160
161 GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
162 " incoming PTS %" GST_TIME_FORMAT
163 " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
164 " to %" GST_STIME_FORMAT, part_pad,
165 GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
166 GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
167 GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
168
169 if (GST_CLOCK_STIME_IS_VALID (ts)) {
170 if (GST_BUFFER_DURATION_IS_VALID (buf))
171 ts += GST_BUFFER_DURATION (buf);
172
173 if (GST_CLOCK_STIME_IS_VALID (ts)
174 && ts > (GstClockTimeDiff) part_pad->max_ts) {
175 part_pad->max_ts = ts;
176 GST_LOG_OBJECT (reader,
177 "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
178 GST_TIME_ARGS (part_pad->max_ts));
179 }
180 }
181 /* Is it time to move to measuring state yet? */
182 check_if_pads_collected (reader);
183 }
184
185 static gboolean
splitmux_data_queue_is_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer checkdata)186 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
187 guint visible, guint bytes, guint64 time, gpointer checkdata)
188 {
189 /* Arbitrary safety limit. If we hit it, playback is likely to stall */
190 if (time > 20 * GST_SECOND)
191 return TRUE;
192 return FALSE;
193 }
194
195 static void
splitmux_part_free_queue_item(GstDataQueueItem * item)196 splitmux_part_free_queue_item (GstDataQueueItem * item)
197 {
198 gst_mini_object_unref (item->object);
199 g_slice_free (GstDataQueueItem, item);
200 }
201
202 static GstFlowReturn
splitmux_part_pad_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)203 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
204 {
205 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
206 GstSplitMuxPartReader *reader = part_pad->reader;
207 GstDataQueueItem *item;
208 GstClockTimeDiff offset;
209
210 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
211 SPLITMUX_PART_LOCK (reader);
212
213 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
214 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
215 handle_buffer_measuring (reader, part_pad, buf);
216 gst_buffer_unref (buf);
217 SPLITMUX_PART_UNLOCK (reader);
218 return GST_FLOW_OK;
219 }
220
221 if (!block_until_can_push (reader)) {
222 /* Flushing */
223 SPLITMUX_PART_UNLOCK (reader);
224 gst_buffer_unref (buf);
225 return GST_FLOW_FLUSHING;
226 }
227
228 /* Adjust buffer timestamps */
229 offset = reader->start_offset + part_pad->segment.base;
230 offset -= part_pad->initial_ts_offset;
231
232 if (GST_BUFFER_PTS_IS_VALID (buf))
233 GST_BUFFER_PTS (buf) += offset;
234 if (GST_BUFFER_DTS_IS_VALID (buf))
235 GST_BUFFER_DTS (buf) += offset;
236
237 /* We are active, and one queue is empty, place this buffer in
238 * the dataqueue */
239 GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
240 item = g_slice_new (GstDataQueueItem);
241 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
242 item->object = GST_MINI_OBJECT (buf);
243 item->size = gst_buffer_get_size (buf);
244 item->duration = GST_BUFFER_DURATION (buf);
245 if (item->duration == GST_CLOCK_TIME_NONE)
246 item->duration = 0;
247 item->visible = TRUE;
248
249 gst_object_ref (part_pad);
250
251 SPLITMUX_PART_UNLOCK (reader);
252
253 if (!gst_data_queue_push (part_pad->queue, item)) {
254 splitmux_part_free_queue_item (item);
255 gst_object_unref (part_pad);
256 return GST_FLOW_FLUSHING;
257 }
258
259 gst_object_unref (part_pad);
260 return GST_FLOW_OK;
261 }
262
263 /* Called with splitmux part lock held */
264 static gboolean
splitmux_part_is_eos_locked(GstSplitMuxPartReader * part)265 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
266 {
267 GList *cur;
268 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
269 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
270 if (!part_pad->is_eos)
271 return FALSE;
272 }
273
274 return TRUE;
275 }
276
277 static gboolean
splitmux_part_is_prerolled_locked(GstSplitMuxPartReader * part)278 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
279 {
280 GList *cur;
281 GST_LOG_OBJECT (part, "Checking for preroll");
282 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
283 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
284 if (!part_pad->seen_buffer) {
285 GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
286 part_pad);
287 return FALSE;
288 }
289 }
290 GST_LOG_OBJECT (part, "Part is prerolled");
291 return TRUE;
292 }
293
294
295 gboolean
gst_splitmux_part_is_eos(GstSplitMuxPartReader * reader)296 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
297 {
298 gboolean res;
299
300 SPLITMUX_PART_LOCK (reader);
301 res = splitmux_part_is_eos_locked (reader);
302 SPLITMUX_PART_UNLOCK (reader);
303
304 return res;
305 }
306
307 /* Called with splitmux part lock held */
308 static gboolean
splitmux_is_flushing(GstSplitMuxPartReader * reader)309 splitmux_is_flushing (GstSplitMuxPartReader * reader)
310 {
311 GList *cur;
312 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
313 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
314 if (part_pad->flushing)
315 return TRUE;
316 }
317
318 return FALSE;
319 }
320
321 static gboolean
splitmux_part_pad_event(GstPad * pad,GstObject * parent,GstEvent * event)322 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
323 {
324 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
325 GstSplitMuxPartReader *reader = part_pad->reader;
326 gboolean ret = TRUE;
327 SplitMuxSrcPad *target;
328 GstDataQueueItem *item;
329
330 SPLITMUX_PART_LOCK (reader);
331
332 target = gst_object_ref (part_pad->target);
333
334 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
335 event);
336
337 if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
338 goto drop_event;
339
340 switch (GST_EVENT_TYPE (event)) {
341 case GST_EVENT_STREAM_START:{
342 GstStreamFlags flags;
343 gst_event_parse_stream_flags (event, &flags);
344 part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
345 break;
346 }
347 case GST_EVENT_SEGMENT:{
348 GstSegment *seg = &part_pad->segment;
349
350 GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
351
352 gst_event_copy_segment (event, seg);
353 gst_event_copy_segment (event, &part_pad->orig_segment);
354
355 if (seg->format != GST_FORMAT_TIME)
356 goto wrong_segment;
357
358 /* Adjust segment */
359 /* Adjust start/stop so the overall file is 0 + start_offset based */
360 if (seg->stop != -1) {
361 seg->stop -= seg->start;
362 seg->stop += seg->time + reader->start_offset;
363 }
364 seg->start = seg->time + reader->start_offset;
365 seg->time += reader->start_offset;
366 seg->position += reader->start_offset;
367
368 GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
369
370 /* Replace event */
371 gst_event_unref (event);
372 event = gst_event_new_segment (seg);
373
374 if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
375 && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
376 break; /* Only do further stuff with segments during initial measuring */
377
378 /* Take the first segment from the first part */
379 if (target->segment.format == GST_FORMAT_UNDEFINED) {
380 gst_segment_copy_into (seg, &target->segment);
381 GST_DEBUG_OBJECT (reader,
382 "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
383 }
384
385 if (seg->stop != -1 && target->segment.stop != -1) {
386 GstClockTime stop = seg->base + seg->stop;
387 if (stop > target->segment.stop) {
388 target->segment.stop = stop;
389 GST_DEBUG_OBJECT (reader,
390 "Adjusting segment stop by %" GST_TIME_FORMAT
391 " output now %" GST_SEGMENT_FORMAT,
392 GST_TIME_ARGS (reader->start_offset), &target->segment);
393 }
394 }
395 GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
396 break;
397 }
398 case GST_EVENT_EOS:{
399
400 GST_DEBUG_OBJECT (part_pad,
401 "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
402 reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
403
404 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
405 reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
406 /* Mark this pad as EOS */
407 part_pad->is_eos = TRUE;
408 if (splitmux_part_is_eos_locked (reader)) {
409 /* Finished measuring things, set state and tell the state change func
410 * so it can seek back to the start */
411 GST_LOG_OBJECT (reader,
412 "EOS while measuring streams. Resetting for ready");
413 reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
414
415 gst_element_call_async (GST_ELEMENT_CAST (reader),
416 (GstElementCallAsyncFunc)
417 gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
418 }
419 goto drop_event;
420 }
421 break;
422 }
423 case GST_EVENT_FLUSH_START:
424 reader->flushing = TRUE;
425 part_pad->flushing = TRUE;
426 GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
427 part_pad);
428 gst_data_queue_set_flushing (part_pad->queue, TRUE);
429 SPLITMUX_PART_BROADCAST (reader);
430 break;
431 case GST_EVENT_FLUSH_STOP:{
432 gst_data_queue_set_flushing (part_pad->queue, FALSE);
433 gst_data_queue_flush (part_pad->queue);
434 part_pad->seen_buffer = FALSE;
435 part_pad->flushing = FALSE;
436 part_pad->is_eos = FALSE;
437
438 reader->flushing = splitmux_is_flushing (reader);
439 GST_LOG_OBJECT (reader,
440 "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
441 reader->path, pad, reader->flushing);
442 SPLITMUX_PART_BROADCAST (reader);
443 break;
444 }
445 default:
446 break;
447 }
448
449 /* Don't send events downstream while preparing */
450 if (reader->prep_state != PART_STATE_READY)
451 goto drop_event;
452
453 /* Don't pass flush events - those are done by the parent */
454 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
455 GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
456 goto drop_event;
457
458 if (!block_until_can_push (reader))
459 goto drop_event;
460
461 switch (GST_EVENT_TYPE (event)) {
462 case GST_EVENT_GAP:{
463 /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
464 goto drop_event;
465 }
466 default:
467 break;
468 }
469
470 /* We are active, and one queue is empty, place this buffer in
471 * the dataqueue */
472 gst_object_ref (part_pad->queue);
473 SPLITMUX_PART_UNLOCK (reader);
474
475 GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
476 item = g_slice_new (GstDataQueueItem);
477 item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
478 item->object = GST_MINI_OBJECT (event);
479 item->size = 0;
480 item->duration = 0;
481 if (item->duration == GST_CLOCK_TIME_NONE)
482 item->duration = 0;
483 item->visible = FALSE;
484
485 if (!gst_data_queue_push (part_pad->queue, item)) {
486 splitmux_part_free_queue_item (item);
487 ret = FALSE;
488 }
489
490 gst_object_unref (part_pad->queue);
491 gst_object_unref (target);
492
493 return ret;
494 wrong_segment:
495 gst_event_unref (event);
496 gst_object_unref (target);
497 SPLITMUX_PART_UNLOCK (reader);
498 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
499 ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
500 reader->path, pad));
501 return FALSE;
502 drop_event:
503 GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
504 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
505 gst_event_unref (event);
506 gst_object_unref (target);
507 SPLITMUX_PART_UNLOCK (reader);
508 return TRUE;
509 }
510
511 static gboolean
splitmux_part_pad_query(GstPad * pad,GstObject * parent,GstQuery * query)512 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
513 {
514 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
515 GstSplitMuxPartReader *reader = part_pad->reader;
516 GstPad *target;
517 gboolean ret = FALSE;
518 gboolean active;
519
520 SPLITMUX_PART_LOCK (reader);
521 target = gst_object_ref (part_pad->target);
522 active = reader->active;
523 SPLITMUX_PART_UNLOCK (reader);
524
525 if (active) {
526 GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
527 " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
528
529 ret = gst_pad_query (target, query);
530 }
531
532 gst_object_unref (target);
533
534 return ret;
535 }
536
537 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
538
539 static void
splitmux_part_pad_constructed(GObject * pad)540 splitmux_part_pad_constructed (GObject * pad)
541 {
542 gst_pad_set_chain_function (GST_PAD (pad),
543 GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
544 gst_pad_set_event_function (GST_PAD (pad),
545 GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
546 gst_pad_set_query_function (GST_PAD (pad),
547 GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
548
549 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
550 }
551
552 static void
gst_splitmux_part_pad_class_init(GstSplitMuxPartPadClass * klass)553 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
554 {
555 GObjectClass *gobject_klass = (GObjectClass *) (klass);
556
557 gobject_klass->constructed = splitmux_part_pad_constructed;
558 gobject_klass->finalize = splitmux_part_pad_finalize;
559 }
560
561 static void
gst_splitmux_part_pad_init(GstSplitMuxPartPad * pad)562 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
563 {
564 pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
565 NULL, NULL, pad);
566 gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
567 gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
568 }
569
570 static void
splitmux_part_pad_finalize(GObject * obj)571 splitmux_part_pad_finalize (GObject * obj)
572 {
573 GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
574
575 GST_DEBUG_OBJECT (obj, "finalize");
576 gst_data_queue_set_flushing (pad->queue, TRUE);
577 gst_data_queue_flush (pad->queue);
578 gst_object_unref (GST_OBJECT_CAST (pad->queue));
579 pad->queue = NULL;
580
581 G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
582 }
583
584 static void
585 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
586 GstSplitMuxPartReader * part);
587 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
588 static GstStateChangeReturn
589 gst_splitmux_part_reader_change_state (GstElement * element,
590 GstStateChange transition);
591 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
592 GstEvent * event);
593 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
594 * part, gboolean flushing);
595 static void bus_handler (GstBin * bin, GstMessage * msg);
596 static void splitmux_part_reader_dispose (GObject * object);
597 static void splitmux_part_reader_finalize (GObject * object);
598 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
599
600 #define gst_splitmux_part_reader_parent_class parent_class
601 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
602 GST_TYPE_PIPELINE);
603
604 static void
gst_splitmux_part_reader_class_init(GstSplitMuxPartReaderClass * klass)605 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
606 {
607 GObjectClass *gobject_klass = (GObjectClass *) (klass);
608 GstElementClass *gstelement_class = (GstElementClass *) klass;
609 GstBinClass *gstbin_class = (GstBinClass *) klass;
610
611 GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
612 "Split File Demuxing Source helper");
613
614 gobject_klass->dispose = splitmux_part_reader_dispose;
615 gobject_klass->finalize = splitmux_part_reader_finalize;
616
617 gstelement_class->change_state = gst_splitmux_part_reader_change_state;
618 gstelement_class->send_event = gst_splitmux_part_reader_send_event;
619
620 gstbin_class->handle_message = bus_handler;
621 }
622
623 static void
gst_splitmux_part_reader_init(GstSplitMuxPartReader * reader)624 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
625 {
626 GstElement *typefind;
627
628 reader->active = FALSE;
629 reader->duration = GST_CLOCK_TIME_NONE;
630
631 g_cond_init (&reader->inactive_cond);
632 g_mutex_init (&reader->lock);
633 g_mutex_init (&reader->type_lock);
634
635 /* FIXME: Create elements on a state change */
636 reader->src = gst_element_factory_make ("filesrc", NULL);
637 if (reader->src == NULL) {
638 GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
639 return;
640 }
641 gst_bin_add (GST_BIN_CAST (reader), reader->src);
642
643 typefind = gst_element_factory_make ("typefind", NULL);
644 if (!typefind) {
645 GST_ERROR_OBJECT (reader,
646 "Failed to create typefind element - check your installation");
647 return;
648 }
649
650 gst_bin_add (GST_BIN_CAST (reader), typefind);
651 reader->typefind = typefind;
652
653 if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
654 GST_ERROR_OBJECT (reader,
655 "Failed to link typefind element - check your installation");
656 return;
657 }
658
659 g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
660 reader);
661 }
662
663 static void
splitmux_part_reader_dispose(GObject * object)664 splitmux_part_reader_dispose (GObject * object)
665 {
666 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
667
668 splitmux_part_reader_reset (reader);
669
670 G_OBJECT_CLASS (parent_class)->dispose (object);
671 }
672
673 static void
splitmux_part_reader_finalize(GObject * object)674 splitmux_part_reader_finalize (GObject * object)
675 {
676 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
677
678 g_cond_clear (&reader->inactive_cond);
679 g_mutex_clear (&reader->lock);
680 g_mutex_clear (&reader->type_lock);
681
682 g_free (reader->path);
683
684 G_OBJECT_CLASS (parent_class)->finalize (object);
685 }
686
687 static void
do_async_start(GstSplitMuxPartReader * reader)688 do_async_start (GstSplitMuxPartReader * reader)
689 {
690 GstMessage *message;
691
692 GST_STATE_LOCK (reader);
693 reader->async_pending = TRUE;
694
695 message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
696 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
697 GST_STATE_UNLOCK (reader);
698 }
699
700 static void
do_async_done(GstSplitMuxPartReader * reader)701 do_async_done (GstSplitMuxPartReader * reader)
702 {
703 GstMessage *message;
704
705 GST_STATE_LOCK (reader);
706 if (reader->async_pending) {
707 message =
708 gst_message_new_async_done (GST_OBJECT_CAST (reader),
709 GST_CLOCK_TIME_NONE);
710 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
711 message);
712
713 reader->async_pending = FALSE;
714 }
715 GST_STATE_UNLOCK (reader);
716 }
717
718 static void
splitmux_part_reader_reset(GstSplitMuxPartReader * reader)719 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
720 {
721 GList *cur;
722
723 SPLITMUX_PART_LOCK (reader);
724 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
725 GstPad *pad = GST_PAD_CAST (cur->data);
726 gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
727 gst_object_unref (GST_OBJECT_CAST (pad));
728 }
729
730 g_list_free (reader->pads);
731 reader->pads = NULL;
732 SPLITMUX_PART_UNLOCK (reader);
733 }
734
735 static GstSplitMuxPartPad *
gst_splitmux_part_reader_new_proxy_pad(GstSplitMuxPartReader * reader,GstPad * target)736 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
737 GstPad * target)
738 {
739 GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
740 "name", GST_PAD_NAME (target),
741 "direction", GST_PAD_SINK,
742 NULL);
743 pad->target = target;
744 pad->reader = reader;
745
746 gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
747
748 return pad;
749 }
750
751 static void
new_decoded_pad_added_cb(GstElement * element,GstPad * pad,GstSplitMuxPartReader * reader)752 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
753 GstSplitMuxPartReader * reader)
754 {
755 GstPad *out_pad = NULL;
756 GstSplitMuxPartPad *proxy_pad;
757 GstCaps *caps;
758 GstPadLinkReturn link_ret;
759
760 caps = gst_pad_get_current_caps (pad);
761
762 GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
763 " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
764
765 gst_caps_unref (caps);
766
767 /* Look up or create the output pad */
768 if (reader->get_pad_cb)
769 out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
770 if (out_pad == NULL) {
771 GST_DEBUG_OBJECT (reader,
772 "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
773 return;
774 }
775
776 /* Create our proxy pad to interact with this new pad */
777 proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
778 GST_DEBUG_OBJECT (reader,
779 "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
780 proxy_pad, out_pad);
781
782 link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
783 if (link_ret != GST_PAD_LINK_OK) {
784 gst_object_unref (proxy_pad);
785 GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
786 ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
787 " ret %d", reader->path, pad, link_ret));
788 return;
789 }
790 GST_DEBUG_OBJECT (reader,
791 "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
792 pad, proxy_pad);
793
794 SPLITMUX_PART_LOCK (reader);
795 reader->pads = g_list_prepend (reader->pads, proxy_pad);
796 SPLITMUX_PART_UNLOCK (reader);
797 }
798
799 static gboolean
gst_splitmux_part_reader_send_event(GstElement * element,GstEvent * event)800 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
801 {
802 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
803 gboolean ret = FALSE;
804 GstPad *pad = NULL;
805
806 /* Send event to the first source pad we found */
807 SPLITMUX_PART_LOCK (reader);
808 if (reader->pads) {
809 GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
810 pad = gst_pad_get_peer (proxy_pad);
811 }
812 SPLITMUX_PART_UNLOCK (reader);
813
814 if (pad) {
815 ret = gst_pad_send_event (pad, event);
816 gst_object_unref (pad);
817 } else {
818 gst_event_unref (event);
819 }
820
821 return ret;
822 }
823
824 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
825 static void
gst_splitmux_part_reader_seek_to_time_locked(GstSplitMuxPartReader * reader,GstClockTime time)826 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
827 GstClockTime time)
828 {
829 SPLITMUX_PART_UNLOCK (reader);
830 GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
831 GST_TIME_ARGS (time));
832 gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
833 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
834 GST_SEEK_TYPE_END, 0);
835
836 SPLITMUX_PART_LOCK (reader);
837
838 /* Wait for flush to finish, so old data is gone */
839 while (reader->flushing) {
840 GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
841 SPLITMUX_PART_WAIT (reader);
842 }
843 }
844
845 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
846 static gboolean
gst_splitmux_part_reader_seek_to_segment(GstSplitMuxPartReader * reader,GstSegment * target_seg,GstSeekFlags extra_flags)847 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
848 GstSegment * target_seg, GstSeekFlags extra_flags)
849 {
850 GstSeekFlags flags;
851 GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
852
853 flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
854
855 SPLITMUX_PART_LOCK (reader);
856 if (target_seg->start >= reader->start_offset)
857 start = target_seg->start - reader->start_offset;
858 /* If the segment stop is within this part, don't play to the end */
859 if (target_seg->stop != -1 &&
860 target_seg->stop < reader->start_offset + reader->duration)
861 stop = target_seg->stop - reader->start_offset;
862
863 SPLITMUX_PART_UNLOCK (reader);
864
865 GST_DEBUG_OBJECT (reader,
866 "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
867 GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
868 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
869
870 return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
871 target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
872 stop);
873 }
874
875 /* Called with lock held */
876 static void
gst_splitmux_part_reader_measure_streams(GstSplitMuxPartReader * reader)877 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
878 {
879 SPLITMUX_PART_LOCK (reader);
880 /* Trigger a flushing seek to near the end of the file and run each stream
881 * to EOS in order to find the smallest end timestamp to start the next
882 * file from
883 */
884 if (GST_CLOCK_TIME_IS_VALID (reader->duration)
885 && reader->duration > GST_SECOND) {
886 GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
887 gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
888 }
889 SPLITMUX_PART_UNLOCK (reader);
890 }
891
892 static void
gst_splitmux_part_reader_finish_measuring_streams(GstSplitMuxPartReader * reader)893 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
894 reader)
895 {
896 SPLITMUX_PART_LOCK (reader);
897 if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
898 /* Fire the prepared signal and go to READY state */
899 GST_DEBUG_OBJECT (reader,
900 "Stream measuring complete. File %s is now ready", reader->path);
901 reader->prep_state = PART_STATE_READY;
902 SPLITMUX_PART_UNLOCK (reader);
903 do_async_done (reader);
904 } else {
905 SPLITMUX_PART_UNLOCK (reader);
906 }
907 }
908
909 static GstElement *
find_demuxer(GstCaps * caps)910 find_demuxer (GstCaps * caps)
911 {
912 GList *factories =
913 gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
914 GST_RANK_MARGINAL);
915 GList *compat_elements;
916 GstElement *e = NULL;
917
918 if (factories == NULL)
919 return NULL;
920
921 compat_elements =
922 gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
923
924 if (compat_elements) {
925 /* Just take the first (highest ranked) option */
926 GstElementFactory *factory =
927 GST_ELEMENT_FACTORY_CAST (compat_elements->data);
928 e = gst_element_factory_create (factory, NULL);
929 gst_plugin_feature_list_free (compat_elements);
930 }
931
932 if (factories)
933 gst_plugin_feature_list_free (factories);
934
935 return e;
936 }
937
938 static void
type_found(GstElement * typefind,guint probability,GstCaps * caps,GstSplitMuxPartReader * reader)939 type_found (GstElement * typefind, guint probability,
940 GstCaps * caps, GstSplitMuxPartReader * reader)
941 {
942 GstElement *demux;
943
944 GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
945
946 /* typefind found a type. Look for the demuxer to handle it */
947 demux = reader->demux = find_demuxer (caps);
948 if (reader->demux == NULL) {
949 GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
950 return;
951 }
952
953 /* Connect to demux signals */
954 g_signal_connect (demux,
955 "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
956 g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
957
958 gst_element_set_locked_state (demux, TRUE);
959 gst_bin_add (GST_BIN_CAST (reader), demux);
960 gst_element_link_pads (reader->typefind, "src", demux, NULL);
961 gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
962 gst_element_set_locked_state (demux, FALSE);
963 }
964
965 static void
check_if_pads_collected(GstSplitMuxPartReader * reader)966 check_if_pads_collected (GstSplitMuxPartReader * reader)
967 {
968 if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
969 /* Check we have all pads and each pad has seen a buffer */
970 if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
971 GST_DEBUG_OBJECT (reader,
972 "no more pads - file %s. Measuring stream length", reader->path);
973 reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
974 gst_element_call_async (GST_ELEMENT_CAST (reader),
975 (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
976 NULL, NULL);
977 }
978 }
979 }
980
981 static void
no_more_pads(GstElement * element,GstSplitMuxPartReader * reader)982 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
983 {
984 GstClockTime duration = GST_CLOCK_TIME_NONE;
985 GList *cur;
986 /* Query the minimum duration of any pad in this piece and store it.
987 * FIXME: Only consider audio and video */
988 SPLITMUX_PART_LOCK (reader);
989 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
990 GstPad *target = GST_PAD_CAST (cur->data);
991 if (target) {
992 gint64 cur_duration;
993 if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
994 GST_INFO_OBJECT (reader,
995 "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
996 reader->path, target, GST_TIME_ARGS (cur_duration));
997 if (cur_duration < duration)
998 duration = cur_duration;
999 }
1000 }
1001 }
1002 GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
1003 reader->path, GST_TIME_ARGS (duration));
1004 reader->duration = (GstClockTime) duration;
1005
1006 reader->no_more_pads = TRUE;
1007
1008 check_if_pads_collected (reader);
1009 SPLITMUX_PART_UNLOCK (reader);
1010 }
1011
1012 gboolean
gst_splitmux_part_reader_src_query(GstSplitMuxPartReader * part,GstPad * src_pad,GstQuery * query)1013 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
1014 GstPad * src_pad, GstQuery * query)
1015 {
1016 GstPad *target = NULL;
1017 gboolean ret;
1018 GList *cur;
1019
1020 SPLITMUX_PART_LOCK (part);
1021 /* Find the pad corresponding to the visible output target pad */
1022 for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
1023 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1024 if (part_pad->target == src_pad) {
1025 target = gst_object_ref (GST_OBJECT_CAST (part_pad));
1026 break;
1027 }
1028 }
1029 SPLITMUX_PART_UNLOCK (part);
1030
1031 if (target == NULL)
1032 return FALSE;
1033
1034 ret = gst_pad_peer_query (target, query);
1035
1036 if (ret == FALSE)
1037 goto out;
1038
1039 /* Post-massaging of queries */
1040 switch (GST_QUERY_TYPE (query)) {
1041 case GST_QUERY_POSITION:{
1042 GstFormat fmt;
1043 gint64 position;
1044
1045 gst_query_parse_position (query, &fmt, &position);
1046 if (fmt != GST_FORMAT_TIME)
1047 return FALSE;
1048 SPLITMUX_PART_LOCK (part);
1049 position += part->start_offset;
1050 GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1051 GST_TIME_ARGS (position));
1052 SPLITMUX_PART_UNLOCK (part);
1053
1054 gst_query_set_position (query, fmt, position);
1055 break;
1056 }
1057 default:
1058 break;
1059 }
1060
1061 out:
1062 gst_object_unref (target);
1063 return ret;
1064 }
1065
1066 static GstStateChangeReturn
gst_splitmux_part_reader_change_state(GstElement * element,GstStateChange transition)1067 gst_splitmux_part_reader_change_state (GstElement * element,
1068 GstStateChange transition)
1069 {
1070 GstStateChangeReturn ret;
1071 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1072
1073 switch (transition) {
1074 case GST_STATE_CHANGE_NULL_TO_READY:{
1075 break;
1076 }
1077 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1078 SPLITMUX_PART_LOCK (reader);
1079 g_object_set (reader->src, "location", reader->path, NULL);
1080 reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1081 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1082 reader->running = TRUE;
1083 SPLITMUX_PART_UNLOCK (reader);
1084
1085 /* we go to PAUSED asynchronously once all streams have been collected
1086 * and seeks to measure the stream lengths are done */
1087 do_async_start (reader);
1088 break;
1089 }
1090 case GST_STATE_CHANGE_READY_TO_NULL:
1091 case GST_STATE_CHANGE_PAUSED_TO_READY:
1092 SPLITMUX_PART_LOCK (reader);
1093 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1094 reader->running = FALSE;
1095 SPLITMUX_PART_BROADCAST (reader);
1096 SPLITMUX_PART_UNLOCK (reader);
1097 break;
1098 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1099 SPLITMUX_PART_LOCK (reader);
1100 reader->active = FALSE;
1101 gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1102 SPLITMUX_PART_BROADCAST (reader);
1103 SPLITMUX_PART_UNLOCK (reader);
1104 break;
1105 default:
1106 break;
1107 }
1108
1109 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1110 if (ret == GST_STATE_CHANGE_FAILURE) {
1111 do_async_done (reader);
1112 goto beach;
1113 }
1114
1115 switch (transition) {
1116 case GST_STATE_CHANGE_READY_TO_PAUSED:
1117 ret = GST_STATE_CHANGE_ASYNC;
1118 break;
1119 case GST_STATE_CHANGE_PAUSED_TO_READY:
1120 do_async_done (reader);
1121 break;
1122 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1123 SPLITMUX_PART_LOCK (reader);
1124 gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1125 reader->active = TRUE;
1126 SPLITMUX_PART_BROADCAST (reader);
1127 SPLITMUX_PART_UNLOCK (reader);
1128 break;
1129 case GST_STATE_CHANGE_READY_TO_NULL:
1130 reader->prep_state = PART_STATE_NULL;
1131 splitmux_part_reader_reset (reader);
1132 break;
1133 default:
1134 break;
1135 }
1136
1137 beach:
1138 return ret;
1139 }
1140
1141 gboolean
gst_splitmux_part_reader_prepare(GstSplitMuxPartReader * part)1142 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1143 {
1144 GstStateChangeReturn ret;
1145
1146 ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1147
1148 if (ret == GST_STATE_CHANGE_FAILURE)
1149 return FALSE;
1150
1151 return TRUE;
1152 }
1153
1154 void
gst_splitmux_part_reader_unprepare(GstSplitMuxPartReader * part)1155 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1156 {
1157 gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1158 }
1159
1160 void
gst_splitmux_part_reader_set_location(GstSplitMuxPartReader * reader,const gchar * path)1161 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1162 const gchar * path)
1163 {
1164 reader->path = g_strdup (path);
1165 }
1166
1167 gboolean
gst_splitmux_part_reader_activate(GstSplitMuxPartReader * reader,GstSegment * seg,GstSeekFlags extra_flags)1168 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1169 GstSegment * seg, GstSeekFlags extra_flags)
1170 {
1171 GST_DEBUG_OBJECT (reader, "Activating part reader");
1172
1173 if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
1174 GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1175 seg);
1176 return FALSE;
1177 }
1178 if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1179 GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1180 GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1181 return FALSE;
1182 }
1183 return TRUE;
1184 }
1185
1186 gboolean
gst_splitmux_part_reader_is_active(GstSplitMuxPartReader * part)1187 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1188 {
1189 gboolean ret;
1190
1191 SPLITMUX_PART_LOCK (part);
1192 ret = part->active;
1193 SPLITMUX_PART_UNLOCK (part);
1194
1195 return ret;
1196 }
1197
1198 void
gst_splitmux_part_reader_deactivate(GstSplitMuxPartReader * reader)1199 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1200 {
1201 GST_DEBUG_OBJECT (reader, "Deactivating reader");
1202 gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1203 }
1204
1205 void
gst_splitmux_part_reader_set_flushing_locked(GstSplitMuxPartReader * reader,gboolean flushing)1206 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1207 gboolean flushing)
1208 {
1209 GList *cur;
1210
1211 GST_LOG_OBJECT (reader, "%s dataqueues",
1212 flushing ? "Flushing" : "Done flushing");
1213 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1214 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1215 gst_data_queue_set_flushing (part_pad->queue, flushing);
1216 if (flushing)
1217 gst_data_queue_flush (part_pad->queue);
1218 }
1219 };
1220
1221 void
gst_splitmux_part_reader_set_callbacks(GstSplitMuxPartReader * reader,gpointer cb_data,GstSplitMuxPartReaderPadCb get_pad_cb)1222 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1223 gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1224 {
1225 reader->cb_data = cb_data;
1226 reader->get_pad_cb = get_pad_cb;
1227 }
1228
1229 GstClockTime
gst_splitmux_part_reader_get_end_offset(GstSplitMuxPartReader * reader)1230 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1231 {
1232 GList *cur;
1233 GstClockTime ret = GST_CLOCK_TIME_NONE;
1234
1235 SPLITMUX_PART_LOCK (reader);
1236 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1237 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1238 if (!part_pad->is_sparse && part_pad->max_ts < ret)
1239 ret = part_pad->max_ts;
1240 }
1241
1242 SPLITMUX_PART_UNLOCK (reader);
1243
1244 return ret;
1245 }
1246
1247 void
gst_splitmux_part_reader_set_start_offset(GstSplitMuxPartReader * reader,GstClockTime offset)1248 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1249 GstClockTime offset)
1250 {
1251 SPLITMUX_PART_LOCK (reader);
1252 reader->start_offset = offset;
1253 GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1254 GST_TIME_ARGS (offset));
1255 SPLITMUX_PART_UNLOCK (reader);
1256 }
1257
1258 GstClockTime
gst_splitmux_part_reader_get_start_offset(GstSplitMuxPartReader * reader)1259 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1260 {
1261 GstClockTime ret = GST_CLOCK_TIME_NONE;
1262
1263 SPLITMUX_PART_LOCK (reader);
1264 ret = reader->start_offset;
1265 SPLITMUX_PART_UNLOCK (reader);
1266
1267 return ret;
1268 }
1269
1270 GstClockTime
gst_splitmux_part_reader_get_duration(GstSplitMuxPartReader * reader)1271 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1272 {
1273 GstClockTime dur;
1274
1275 SPLITMUX_PART_LOCK (reader);
1276 dur = reader->duration;
1277 SPLITMUX_PART_UNLOCK (reader);
1278
1279 return dur;
1280 }
1281
1282 GstPad *
gst_splitmux_part_reader_lookup_pad(GstSplitMuxPartReader * reader,GstPad * target)1283 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1284 GstPad * target)
1285 {
1286 GstPad *result = NULL;
1287 GList *cur;
1288
1289 SPLITMUX_PART_LOCK (reader);
1290 for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1291 GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1292 if (part_pad->target == target) {
1293 result = (GstPad *) gst_object_ref (part_pad);
1294 break;
1295 }
1296 }
1297 SPLITMUX_PART_UNLOCK (reader);
1298
1299 return result;
1300 }
1301
1302 GstFlowReturn
gst_splitmux_part_reader_pop(GstSplitMuxPartReader * reader,GstPad * pad,GstDataQueueItem ** item)1303 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1304 GstDataQueueItem ** item)
1305 {
1306 GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1307 GstDataQueue *q;
1308 GstFlowReturn ret;
1309
1310 /* Get one item from the appropriate dataqueue */
1311 SPLITMUX_PART_LOCK (reader);
1312 if (reader->prep_state == PART_STATE_FAILED) {
1313 SPLITMUX_PART_UNLOCK (reader);
1314 return GST_FLOW_ERROR;
1315 }
1316
1317 q = gst_object_ref (part_pad->queue);
1318
1319 /* Have to drop the lock around pop, so we can be woken up for flush */
1320 SPLITMUX_PART_UNLOCK (reader);
1321 if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1322 ret = GST_FLOW_FLUSHING;
1323 goto out;
1324 }
1325
1326 SPLITMUX_PART_LOCK (reader);
1327
1328 SPLITMUX_PART_BROADCAST (reader);
1329 if (GST_IS_EVENT ((*item)->object)) {
1330 GstEvent *e = (GstEvent *) ((*item)->object);
1331 /* Mark this pad as EOS */
1332 if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1333 part_pad->is_eos = TRUE;
1334 }
1335
1336 SPLITMUX_PART_UNLOCK (reader);
1337
1338 ret = GST_FLOW_OK;
1339 out:
1340 gst_object_unref (q);
1341 return ret;
1342 }
1343
1344 static void
bus_handler(GstBin * bin,GstMessage * message)1345 bus_handler (GstBin * bin, GstMessage * message)
1346 {
1347 GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1348
1349 switch (GST_MESSAGE_TYPE (message)) {
1350 case GST_MESSAGE_ERROR:
1351 /* Make sure to set the state to failed and wake up the listener
1352 * on error */
1353 SPLITMUX_PART_LOCK (reader);
1354 GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
1355 " marking this reader as failed", GST_MESSAGE_SRC (message));
1356 reader->prep_state = PART_STATE_FAILED;
1357 SPLITMUX_PART_BROADCAST (reader);
1358 SPLITMUX_PART_UNLOCK (reader);
1359 do_async_done (reader);
1360 break;
1361 default:
1362 break;
1363 }
1364
1365 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1366 }
1367