• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28 
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31 
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36 
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39 
40 typedef struct _GstSplitMuxPartPad
41 {
42   GstPad parent;
43 
44   /* Reader we belong to */
45   GstSplitMuxPartReader *reader;
46   /* Output splitmuxsrc source pad */
47   GstPad *target;
48 
49   GstDataQueue *queue;
50 
51   gboolean is_eos;
52   gboolean flushing;
53   gboolean seen_buffer;
54 
55   gboolean is_sparse;
56   GstClockTime max_ts;
57   GstSegment segment;
58 
59   GstSegment orig_segment;
60   GstClockTime initial_ts_offset;
61 } GstSplitMuxPartPad;
62 
63 typedef struct _GstSplitMuxPartPadClass
64 {
65   GstPadClass parent;
66 } GstSplitMuxPartPadClass;
67 
68 static GType gst_splitmux_part_pad_get_type (void);
69 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
70 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
71 
72 static void splitmux_part_pad_constructed (GObject * pad);
73 static void splitmux_part_pad_finalize (GObject * pad);
74 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
75     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
76 
77 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
78     guint visible, guint bytes, guint64 time, gpointer checkdata);
79 static void type_found (GstElement * typefind, guint probability,
80     GstCaps * caps, GstSplitMuxPartReader * reader);
81 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
82 
83 static void
84 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
85     reader);
86 
87 /* Called with reader lock held */
88 static gboolean
have_empty_queue(GstSplitMuxPartReader * reader)89 have_empty_queue (GstSplitMuxPartReader * reader)
90 {
91   GList *cur;
92 
93   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
94     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
95     if (part_pad->is_eos) {
96       GST_LOG_OBJECT (part_pad, "Pad is EOS");
97       return TRUE;
98     }
99     if (gst_data_queue_is_empty (part_pad->queue)) {
100       GST_LOG_OBJECT (part_pad, "Queue is empty");
101       return TRUE;
102     }
103   }
104 
105   return FALSE;
106 }
107 
108 /* Called with reader lock held */
109 static gboolean
block_until_can_push(GstSplitMuxPartReader * reader)110 block_until_can_push (GstSplitMuxPartReader * reader)
111 {
112   while (reader->running) {
113     if (reader->flushing)
114       goto out;
115     if (reader->active && have_empty_queue (reader))
116       goto out;
117 
118     GST_LOG_OBJECT (reader,
119         "Waiting for activation or empty queue on reader %s", reader->path);
120     SPLITMUX_PART_WAIT (reader);
121   }
122 
123   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
124       reader->path, reader->active, reader->flushing);
125 out:
126   return reader->active && !reader->flushing;
127 }
128 
129 static void
handle_buffer_measuring(GstSplitMuxPartReader * reader,GstSplitMuxPartPad * part_pad,GstBuffer * buf)130 handle_buffer_measuring (GstSplitMuxPartReader * reader,
131     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
132 {
133   GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
134   GstClockTimeDiff offset;
135 
136   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
137       !part_pad->seen_buffer) {
138     /* If this is the first buffer on the pad in the collect_streams state,
139      * then calculate inital offset based on running time of this segment */
140     part_pad->initial_ts_offset =
141         part_pad->orig_segment.start + part_pad->orig_segment.base -
142         part_pad->orig_segment.time;
143     GST_DEBUG_OBJECT (reader,
144         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
145         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
146   }
147   part_pad->seen_buffer = TRUE;
148 
149   /* Adjust buffer timestamps */
150   offset = reader->start_offset + part_pad->segment.base;
151   offset -= part_pad->initial_ts_offset;
152 
153   /* Update the stored max duration on the pad,
154    * always preferring making DTS contiguous
155    * where possible */
156   if (GST_BUFFER_DTS_IS_VALID (buf))
157     ts = GST_BUFFER_DTS (buf) + offset;
158   else if (GST_BUFFER_PTS_IS_VALID (buf))
159     ts = GST_BUFFER_PTS (buf) + offset;
160 
161   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
162       " incoming PTS %" GST_TIME_FORMAT
163       " DTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
164       " to %" GST_STIME_FORMAT, part_pad,
165       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
166       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
167       GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
168 
169   if (GST_CLOCK_STIME_IS_VALID (ts)) {
170     if (GST_BUFFER_DURATION_IS_VALID (buf))
171       ts += GST_BUFFER_DURATION (buf);
172 
173     if (GST_CLOCK_STIME_IS_VALID (ts)
174         && ts > (GstClockTimeDiff) part_pad->max_ts) {
175       part_pad->max_ts = ts;
176       GST_LOG_OBJECT (reader,
177           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
178           GST_TIME_ARGS (part_pad->max_ts));
179     }
180   }
181   /* Is it time to move to measuring state yet? */
182   check_if_pads_collected (reader);
183 }
184 
185 static gboolean
splitmux_data_queue_is_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer checkdata)186 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
187     guint visible, guint bytes, guint64 time, gpointer checkdata)
188 {
189   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
190   if (time > 20 * GST_SECOND)
191     return TRUE;
192   return FALSE;
193 }
194 
195 static void
splitmux_part_free_queue_item(GstDataQueueItem * item)196 splitmux_part_free_queue_item (GstDataQueueItem * item)
197 {
198   gst_mini_object_unref (item->object);
199   g_slice_free (GstDataQueueItem, item);
200 }
201 
202 static GstFlowReturn
splitmux_part_pad_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)203 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
204 {
205   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
206   GstSplitMuxPartReader *reader = part_pad->reader;
207   GstDataQueueItem *item;
208   GstClockTimeDiff offset;
209 
210   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
211   SPLITMUX_PART_LOCK (reader);
212 
213   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
214       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
215     handle_buffer_measuring (reader, part_pad, buf);
216     gst_buffer_unref (buf);
217     SPLITMUX_PART_UNLOCK (reader);
218     return GST_FLOW_OK;
219   }
220 
221   if (!block_until_can_push (reader)) {
222     /* Flushing */
223     SPLITMUX_PART_UNLOCK (reader);
224     gst_buffer_unref (buf);
225     return GST_FLOW_FLUSHING;
226   }
227 
228   /* Adjust buffer timestamps */
229   offset = reader->start_offset + part_pad->segment.base;
230   offset -= part_pad->initial_ts_offset;
231 
232   if (GST_BUFFER_PTS_IS_VALID (buf))
233     GST_BUFFER_PTS (buf) += offset;
234   if (GST_BUFFER_DTS_IS_VALID (buf))
235     GST_BUFFER_DTS (buf) += offset;
236 
237   /* We are active, and one queue is empty, place this buffer in
238    * the dataqueue */
239   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
240   item = g_slice_new (GstDataQueueItem);
241   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
242   item->object = GST_MINI_OBJECT (buf);
243   item->size = gst_buffer_get_size (buf);
244   item->duration = GST_BUFFER_DURATION (buf);
245   if (item->duration == GST_CLOCK_TIME_NONE)
246     item->duration = 0;
247   item->visible = TRUE;
248 
249   gst_object_ref (part_pad);
250 
251   SPLITMUX_PART_UNLOCK (reader);
252 
253   if (!gst_data_queue_push (part_pad->queue, item)) {
254     splitmux_part_free_queue_item (item);
255     gst_object_unref (part_pad);
256     return GST_FLOW_FLUSHING;
257   }
258 
259   gst_object_unref (part_pad);
260   return GST_FLOW_OK;
261 }
262 
263 /* Called with splitmux part lock held */
264 static gboolean
splitmux_part_is_eos_locked(GstSplitMuxPartReader * part)265 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
266 {
267   GList *cur;
268   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
269     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
270     if (!part_pad->is_eos)
271       return FALSE;
272   }
273 
274   return TRUE;
275 }
276 
277 static gboolean
splitmux_part_is_prerolled_locked(GstSplitMuxPartReader * part)278 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
279 {
280   GList *cur;
281   GST_LOG_OBJECT (part, "Checking for preroll");
282   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
283     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
284     if (!part_pad->seen_buffer) {
285       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
286           part_pad);
287       return FALSE;
288     }
289   }
290   GST_LOG_OBJECT (part, "Part is prerolled");
291   return TRUE;
292 }
293 
294 
295 gboolean
gst_splitmux_part_is_eos(GstSplitMuxPartReader * reader)296 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
297 {
298   gboolean res;
299 
300   SPLITMUX_PART_LOCK (reader);
301   res = splitmux_part_is_eos_locked (reader);
302   SPLITMUX_PART_UNLOCK (reader);
303 
304   return res;
305 }
306 
307 /* Called with splitmux part lock held */
308 static gboolean
splitmux_is_flushing(GstSplitMuxPartReader * reader)309 splitmux_is_flushing (GstSplitMuxPartReader * reader)
310 {
311   GList *cur;
312   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
313     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
314     if (part_pad->flushing)
315       return TRUE;
316   }
317 
318   return FALSE;
319 }
320 
321 static gboolean
splitmux_part_pad_event(GstPad * pad,GstObject * parent,GstEvent * event)322 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
323 {
324   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
325   GstSplitMuxPartReader *reader = part_pad->reader;
326   gboolean ret = TRUE;
327   SplitMuxSrcPad *target;
328   GstDataQueueItem *item;
329 
330   SPLITMUX_PART_LOCK (reader);
331 
332   target = gst_object_ref (part_pad->target);
333 
334   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
335       event);
336 
337   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
338     goto drop_event;
339 
340   switch (GST_EVENT_TYPE (event)) {
341     case GST_EVENT_STREAM_START:{
342       GstStreamFlags flags;
343       gst_event_parse_stream_flags (event, &flags);
344       part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
345       break;
346     }
347     case GST_EVENT_SEGMENT:{
348       GstSegment *seg = &part_pad->segment;
349 
350       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
351 
352       gst_event_copy_segment (event, seg);
353       gst_event_copy_segment (event, &part_pad->orig_segment);
354 
355       if (seg->format != GST_FORMAT_TIME)
356         goto wrong_segment;
357 
358       /* Adjust segment */
359       /* Adjust start/stop so the overall file is 0 + start_offset based */
360       if (seg->stop != -1) {
361         seg->stop -= seg->start;
362         seg->stop += seg->time + reader->start_offset;
363       }
364       seg->start = seg->time + reader->start_offset;
365       seg->time += reader->start_offset;
366       seg->position += reader->start_offset;
367 
368       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
369 
370       /* Replace event */
371       gst_event_unref (event);
372       event = gst_event_new_segment (seg);
373 
374       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
375           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
376         break;                  /* Only do further stuff with segments during initial measuring */
377 
378       /* Take the first segment from the first part */
379       if (target->segment.format == GST_FORMAT_UNDEFINED) {
380         gst_segment_copy_into (seg, &target->segment);
381         GST_DEBUG_OBJECT (reader,
382             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
383       }
384 
385       if (seg->stop != -1 && target->segment.stop != -1) {
386         GstClockTime stop = seg->base + seg->stop;
387         if (stop > target->segment.stop) {
388           target->segment.stop = stop;
389           GST_DEBUG_OBJECT (reader,
390               "Adjusting segment stop by %" GST_TIME_FORMAT
391               " output now %" GST_SEGMENT_FORMAT,
392               GST_TIME_ARGS (reader->start_offset), &target->segment);
393         }
394       }
395       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
396       break;
397     }
398     case GST_EVENT_EOS:{
399 
400       GST_DEBUG_OBJECT (part_pad,
401           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
402           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
403 
404       if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
405           reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
406         /* Mark this pad as EOS */
407         part_pad->is_eos = TRUE;
408         if (splitmux_part_is_eos_locked (reader)) {
409           /* Finished measuring things, set state and tell the state change func
410            * so it can seek back to the start */
411           GST_LOG_OBJECT (reader,
412               "EOS while measuring streams. Resetting for ready");
413           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
414 
415           gst_element_call_async (GST_ELEMENT_CAST (reader),
416               (GstElementCallAsyncFunc)
417               gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
418         }
419         goto drop_event;
420       }
421       break;
422     }
423     case GST_EVENT_FLUSH_START:
424       reader->flushing = TRUE;
425       part_pad->flushing = TRUE;
426       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
427           part_pad);
428       gst_data_queue_set_flushing (part_pad->queue, TRUE);
429       SPLITMUX_PART_BROADCAST (reader);
430       break;
431     case GST_EVENT_FLUSH_STOP:{
432       gst_data_queue_set_flushing (part_pad->queue, FALSE);
433       gst_data_queue_flush (part_pad->queue);
434       part_pad->seen_buffer = FALSE;
435       part_pad->flushing = FALSE;
436       part_pad->is_eos = FALSE;
437 
438       reader->flushing = splitmux_is_flushing (reader);
439       GST_LOG_OBJECT (reader,
440           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
441           reader->path, pad, reader->flushing);
442       SPLITMUX_PART_BROADCAST (reader);
443       break;
444     }
445     default:
446       break;
447   }
448 
449   /* Don't send events downstream while preparing */
450   if (reader->prep_state != PART_STATE_READY)
451     goto drop_event;
452 
453   /* Don't pass flush events - those are done by the parent */
454   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
455       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
456     goto drop_event;
457 
458   if (!block_until_can_push (reader))
459     goto drop_event;
460 
461   switch (GST_EVENT_TYPE (event)) {
462     case GST_EVENT_GAP:{
463       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
464       goto drop_event;
465     }
466     default:
467       break;
468   }
469 
470   /* We are active, and one queue is empty, place this buffer in
471    * the dataqueue */
472   gst_object_ref (part_pad->queue);
473   SPLITMUX_PART_UNLOCK (reader);
474 
475   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
476   item = g_slice_new (GstDataQueueItem);
477   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
478   item->object = GST_MINI_OBJECT (event);
479   item->size = 0;
480   item->duration = 0;
481   if (item->duration == GST_CLOCK_TIME_NONE)
482     item->duration = 0;
483   item->visible = FALSE;
484 
485   if (!gst_data_queue_push (part_pad->queue, item)) {
486     splitmux_part_free_queue_item (item);
487     ret = FALSE;
488   }
489 
490   gst_object_unref (part_pad->queue);
491   gst_object_unref (target);
492 
493   return ret;
494 wrong_segment:
495   gst_event_unref (event);
496   gst_object_unref (target);
497   SPLITMUX_PART_UNLOCK (reader);
498   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
499       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
500           reader->path, pad));
501   return FALSE;
502 drop_event:
503   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
504       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
505   gst_event_unref (event);
506   gst_object_unref (target);
507   SPLITMUX_PART_UNLOCK (reader);
508   return TRUE;
509 }
510 
511 static gboolean
splitmux_part_pad_query(GstPad * pad,GstObject * parent,GstQuery * query)512 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
513 {
514   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
515   GstSplitMuxPartReader *reader = part_pad->reader;
516   GstPad *target;
517   gboolean ret = FALSE;
518   gboolean active;
519 
520   SPLITMUX_PART_LOCK (reader);
521   target = gst_object_ref (part_pad->target);
522   active = reader->active;
523   SPLITMUX_PART_UNLOCK (reader);
524 
525   if (active) {
526     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
527         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
528 
529     ret = gst_pad_query (target, query);
530   }
531 
532   gst_object_unref (target);
533 
534   return ret;
535 }
536 
537 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
538 
539 static void
splitmux_part_pad_constructed(GObject * pad)540 splitmux_part_pad_constructed (GObject * pad)
541 {
542   gst_pad_set_chain_function (GST_PAD (pad),
543       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
544   gst_pad_set_event_function (GST_PAD (pad),
545       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
546   gst_pad_set_query_function (GST_PAD (pad),
547       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
548 
549   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
550 }
551 
552 static void
gst_splitmux_part_pad_class_init(GstSplitMuxPartPadClass * klass)553 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
554 {
555   GObjectClass *gobject_klass = (GObjectClass *) (klass);
556 
557   gobject_klass->constructed = splitmux_part_pad_constructed;
558   gobject_klass->finalize = splitmux_part_pad_finalize;
559 }
560 
561 static void
gst_splitmux_part_pad_init(GstSplitMuxPartPad * pad)562 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
563 {
564   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
565       NULL, NULL, pad);
566   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
567   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
568 }
569 
570 static void
splitmux_part_pad_finalize(GObject * obj)571 splitmux_part_pad_finalize (GObject * obj)
572 {
573   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
574 
575   GST_DEBUG_OBJECT (obj, "finalize");
576   gst_data_queue_set_flushing (pad->queue, TRUE);
577   gst_data_queue_flush (pad->queue);
578   gst_object_unref (GST_OBJECT_CAST (pad->queue));
579   pad->queue = NULL;
580 
581   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
582 }
583 
584 static void
585 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
586     GstSplitMuxPartReader * part);
587 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
588 static GstStateChangeReturn
589 gst_splitmux_part_reader_change_state (GstElement * element,
590     GstStateChange transition);
591 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
592     GstEvent * event);
593 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
594     * part, gboolean flushing);
595 static void bus_handler (GstBin * bin, GstMessage * msg);
596 static void splitmux_part_reader_dispose (GObject * object);
597 static void splitmux_part_reader_finalize (GObject * object);
598 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
599 
600 #define gst_splitmux_part_reader_parent_class parent_class
601 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
602     GST_TYPE_PIPELINE);
603 
604 static void
gst_splitmux_part_reader_class_init(GstSplitMuxPartReaderClass * klass)605 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
606 {
607   GObjectClass *gobject_klass = (GObjectClass *) (klass);
608   GstElementClass *gstelement_class = (GstElementClass *) klass;
609   GstBinClass *gstbin_class = (GstBinClass *) klass;
610 
611   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
612       "Split File Demuxing Source helper");
613 
614   gobject_klass->dispose = splitmux_part_reader_dispose;
615   gobject_klass->finalize = splitmux_part_reader_finalize;
616 
617   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
618   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
619 
620   gstbin_class->handle_message = bus_handler;
621 }
622 
623 static void
gst_splitmux_part_reader_init(GstSplitMuxPartReader * reader)624 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
625 {
626   GstElement *typefind;
627 
628   reader->active = FALSE;
629   reader->duration = GST_CLOCK_TIME_NONE;
630 
631   g_cond_init (&reader->inactive_cond);
632   g_mutex_init (&reader->lock);
633   g_mutex_init (&reader->type_lock);
634 
635   /* FIXME: Create elements on a state change */
636   reader->src = gst_element_factory_make ("filesrc", NULL);
637   if (reader->src == NULL) {
638     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
639     return;
640   }
641   gst_bin_add (GST_BIN_CAST (reader), reader->src);
642 
643   typefind = gst_element_factory_make ("typefind", NULL);
644   if (!typefind) {
645     GST_ERROR_OBJECT (reader,
646         "Failed to create typefind element - check your installation");
647     return;
648   }
649 
650   gst_bin_add (GST_BIN_CAST (reader), typefind);
651   reader->typefind = typefind;
652 
653   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
654     GST_ERROR_OBJECT (reader,
655         "Failed to link typefind element - check your installation");
656     return;
657   }
658 
659   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
660       reader);
661 }
662 
663 static void
splitmux_part_reader_dispose(GObject * object)664 splitmux_part_reader_dispose (GObject * object)
665 {
666   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
667 
668   splitmux_part_reader_reset (reader);
669 
670   G_OBJECT_CLASS (parent_class)->dispose (object);
671 }
672 
673 static void
splitmux_part_reader_finalize(GObject * object)674 splitmux_part_reader_finalize (GObject * object)
675 {
676   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
677 
678   g_cond_clear (&reader->inactive_cond);
679   g_mutex_clear (&reader->lock);
680   g_mutex_clear (&reader->type_lock);
681 
682   g_free (reader->path);
683 
684   G_OBJECT_CLASS (parent_class)->finalize (object);
685 }
686 
687 static void
do_async_start(GstSplitMuxPartReader * reader)688 do_async_start (GstSplitMuxPartReader * reader)
689 {
690   GstMessage *message;
691 
692   GST_STATE_LOCK (reader);
693   reader->async_pending = TRUE;
694 
695   message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
696   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
697   GST_STATE_UNLOCK (reader);
698 }
699 
700 static void
do_async_done(GstSplitMuxPartReader * reader)701 do_async_done (GstSplitMuxPartReader * reader)
702 {
703   GstMessage *message;
704 
705   GST_STATE_LOCK (reader);
706   if (reader->async_pending) {
707     message =
708         gst_message_new_async_done (GST_OBJECT_CAST (reader),
709         GST_CLOCK_TIME_NONE);
710     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
711         message);
712 
713     reader->async_pending = FALSE;
714   }
715   GST_STATE_UNLOCK (reader);
716 }
717 
718 static void
splitmux_part_reader_reset(GstSplitMuxPartReader * reader)719 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
720 {
721   GList *cur;
722 
723   SPLITMUX_PART_LOCK (reader);
724   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
725     GstPad *pad = GST_PAD_CAST (cur->data);
726     gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
727     gst_object_unref (GST_OBJECT_CAST (pad));
728   }
729 
730   g_list_free (reader->pads);
731   reader->pads = NULL;
732   SPLITMUX_PART_UNLOCK (reader);
733 }
734 
735 static GstSplitMuxPartPad *
gst_splitmux_part_reader_new_proxy_pad(GstSplitMuxPartReader * reader,GstPad * target)736 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
737     GstPad * target)
738 {
739   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
740       "name", GST_PAD_NAME (target),
741       "direction", GST_PAD_SINK,
742       NULL);
743   pad->target = target;
744   pad->reader = reader;
745 
746   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
747 
748   return pad;
749 }
750 
751 static void
new_decoded_pad_added_cb(GstElement * element,GstPad * pad,GstSplitMuxPartReader * reader)752 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
753     GstSplitMuxPartReader * reader)
754 {
755   GstPad *out_pad = NULL;
756   GstSplitMuxPartPad *proxy_pad;
757   GstCaps *caps;
758   GstPadLinkReturn link_ret;
759 
760   caps = gst_pad_get_current_caps (pad);
761 
762   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
763       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
764 
765   gst_caps_unref (caps);
766 
767   /* Look up or create the output pad */
768   if (reader->get_pad_cb)
769     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
770   if (out_pad == NULL) {
771     GST_DEBUG_OBJECT (reader,
772         "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
773     return;
774   }
775 
776   /* Create our proxy pad to interact with this new pad */
777   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
778   GST_DEBUG_OBJECT (reader,
779       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
780       proxy_pad, out_pad);
781 
782   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
783   if (link_ret != GST_PAD_LINK_OK) {
784     gst_object_unref (proxy_pad);
785     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
786         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
787             " ret %d", reader->path, pad, link_ret));
788     return;
789   }
790   GST_DEBUG_OBJECT (reader,
791       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
792       pad, proxy_pad);
793 
794   SPLITMUX_PART_LOCK (reader);
795   reader->pads = g_list_prepend (reader->pads, proxy_pad);
796   SPLITMUX_PART_UNLOCK (reader);
797 }
798 
799 static gboolean
gst_splitmux_part_reader_send_event(GstElement * element,GstEvent * event)800 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
801 {
802   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
803   gboolean ret = FALSE;
804   GstPad *pad = NULL;
805 
806   /* Send event to the first source pad we found */
807   SPLITMUX_PART_LOCK (reader);
808   if (reader->pads) {
809     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
810     pad = gst_pad_get_peer (proxy_pad);
811   }
812   SPLITMUX_PART_UNLOCK (reader);
813 
814   if (pad) {
815     ret = gst_pad_send_event (pad, event);
816     gst_object_unref (pad);
817   } else {
818     gst_event_unref (event);
819   }
820 
821   return ret;
822 }
823 
824 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
825 static void
gst_splitmux_part_reader_seek_to_time_locked(GstSplitMuxPartReader * reader,GstClockTime time)826 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
827     GstClockTime time)
828 {
829   SPLITMUX_PART_UNLOCK (reader);
830   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
831       GST_TIME_ARGS (time));
832   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
833       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
834       GST_SEEK_TYPE_END, 0);
835 
836   SPLITMUX_PART_LOCK (reader);
837 
838   /* Wait for flush to finish, so old data is gone */
839   while (reader->flushing) {
840     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
841     SPLITMUX_PART_WAIT (reader);
842   }
843 }
844 
845 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
846 static gboolean
gst_splitmux_part_reader_seek_to_segment(GstSplitMuxPartReader * reader,GstSegment * target_seg,GstSeekFlags extra_flags)847 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
848     GstSegment * target_seg, GstSeekFlags extra_flags)
849 {
850   GstSeekFlags flags;
851   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
852 
853   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
854 
855   SPLITMUX_PART_LOCK (reader);
856   if (target_seg->start >= reader->start_offset)
857     start = target_seg->start - reader->start_offset;
858   /* If the segment stop is within this part, don't play to the end */
859   if (target_seg->stop != -1 &&
860       target_seg->stop < reader->start_offset + reader->duration)
861     stop = target_seg->stop - reader->start_offset;
862 
863   SPLITMUX_PART_UNLOCK (reader);
864 
865   GST_DEBUG_OBJECT (reader,
866       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
867       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
868       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
869 
870   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
871       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
872       stop);
873 }
874 
875 /* Called with lock held */
876 static void
gst_splitmux_part_reader_measure_streams(GstSplitMuxPartReader * reader)877 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
878 {
879   SPLITMUX_PART_LOCK (reader);
880   /* Trigger a flushing seek to near the end of the file and run each stream
881    * to EOS in order to find the smallest end timestamp to start the next
882    * file from
883    */
884   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
885       && reader->duration > GST_SECOND) {
886     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
887     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
888   }
889   SPLITMUX_PART_UNLOCK (reader);
890 }
891 
892 static void
gst_splitmux_part_reader_finish_measuring_streams(GstSplitMuxPartReader * reader)893 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
894     reader)
895 {
896   SPLITMUX_PART_LOCK (reader);
897   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
898     /* Fire the prepared signal and go to READY state */
899     GST_DEBUG_OBJECT (reader,
900         "Stream measuring complete. File %s is now ready", reader->path);
901     reader->prep_state = PART_STATE_READY;
902     SPLITMUX_PART_UNLOCK (reader);
903     do_async_done (reader);
904   } else {
905     SPLITMUX_PART_UNLOCK (reader);
906   }
907 }
908 
909 static GstElement *
find_demuxer(GstCaps * caps)910 find_demuxer (GstCaps * caps)
911 {
912   GList *factories =
913       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
914       GST_RANK_MARGINAL);
915   GList *compat_elements;
916   GstElement *e = NULL;
917 
918   if (factories == NULL)
919     return NULL;
920 
921   compat_elements =
922       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
923 
924   if (compat_elements) {
925     /* Just take the first (highest ranked) option */
926     GstElementFactory *factory =
927         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
928     e = gst_element_factory_create (factory, NULL);
929     gst_plugin_feature_list_free (compat_elements);
930   }
931 
932   if (factories)
933     gst_plugin_feature_list_free (factories);
934 
935   return e;
936 }
937 
938 static void
type_found(GstElement * typefind,guint probability,GstCaps * caps,GstSplitMuxPartReader * reader)939 type_found (GstElement * typefind, guint probability,
940     GstCaps * caps, GstSplitMuxPartReader * reader)
941 {
942   GstElement *demux;
943 
944   GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
945 
946   /* typefind found a type. Look for the demuxer to handle it */
947   demux = reader->demux = find_demuxer (caps);
948   if (reader->demux == NULL) {
949     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
950     return;
951   }
952 
953   /* Connect to demux signals */
954   g_signal_connect (demux,
955       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
956   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
957 
958   gst_element_set_locked_state (demux, TRUE);
959   gst_bin_add (GST_BIN_CAST (reader), demux);
960   gst_element_link_pads (reader->typefind, "src", demux, NULL);
961   gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
962   gst_element_set_locked_state (demux, FALSE);
963 }
964 
965 static void
check_if_pads_collected(GstSplitMuxPartReader * reader)966 check_if_pads_collected (GstSplitMuxPartReader * reader)
967 {
968   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
969     /* Check we have all pads and each pad has seen a buffer */
970     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
971       GST_DEBUG_OBJECT (reader,
972           "no more pads - file %s. Measuring stream length", reader->path);
973       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
974       gst_element_call_async (GST_ELEMENT_CAST (reader),
975           (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
976           NULL, NULL);
977     }
978   }
979 }
980 
981 static void
no_more_pads(GstElement * element,GstSplitMuxPartReader * reader)982 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
983 {
984   GstClockTime duration = GST_CLOCK_TIME_NONE;
985   GList *cur;
986   /* Query the minimum duration of any pad in this piece and store it.
987    * FIXME: Only consider audio and video */
988   SPLITMUX_PART_LOCK (reader);
989   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
990     GstPad *target = GST_PAD_CAST (cur->data);
991     if (target) {
992       gint64 cur_duration;
993       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
994         GST_INFO_OBJECT (reader,
995             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
996             reader->path, target, GST_TIME_ARGS (cur_duration));
997         if (cur_duration < duration)
998           duration = cur_duration;
999       }
1000     }
1001   }
1002   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
1003       reader->path, GST_TIME_ARGS (duration));
1004   reader->duration = (GstClockTime) duration;
1005 
1006   reader->no_more_pads = TRUE;
1007 
1008   check_if_pads_collected (reader);
1009   SPLITMUX_PART_UNLOCK (reader);
1010 }
1011 
1012 gboolean
gst_splitmux_part_reader_src_query(GstSplitMuxPartReader * part,GstPad * src_pad,GstQuery * query)1013 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
1014     GstPad * src_pad, GstQuery * query)
1015 {
1016   GstPad *target = NULL;
1017   gboolean ret;
1018   GList *cur;
1019 
1020   SPLITMUX_PART_LOCK (part);
1021   /* Find the pad corresponding to the visible output target pad */
1022   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
1023     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1024     if (part_pad->target == src_pad) {
1025       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
1026       break;
1027     }
1028   }
1029   SPLITMUX_PART_UNLOCK (part);
1030 
1031   if (target == NULL)
1032     return FALSE;
1033 
1034   ret = gst_pad_peer_query (target, query);
1035 
1036   if (ret == FALSE)
1037     goto out;
1038 
1039   /* Post-massaging of queries */
1040   switch (GST_QUERY_TYPE (query)) {
1041     case GST_QUERY_POSITION:{
1042       GstFormat fmt;
1043       gint64 position;
1044 
1045       gst_query_parse_position (query, &fmt, &position);
1046       if (fmt != GST_FORMAT_TIME)
1047         return FALSE;
1048       SPLITMUX_PART_LOCK (part);
1049       position += part->start_offset;
1050       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1051           GST_TIME_ARGS (position));
1052       SPLITMUX_PART_UNLOCK (part);
1053 
1054       gst_query_set_position (query, fmt, position);
1055       break;
1056     }
1057     default:
1058       break;
1059   }
1060 
1061 out:
1062   gst_object_unref (target);
1063   return ret;
1064 }
1065 
1066 static GstStateChangeReturn
gst_splitmux_part_reader_change_state(GstElement * element,GstStateChange transition)1067 gst_splitmux_part_reader_change_state (GstElement * element,
1068     GstStateChange transition)
1069 {
1070   GstStateChangeReturn ret;
1071   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1072 
1073   switch (transition) {
1074     case GST_STATE_CHANGE_NULL_TO_READY:{
1075       break;
1076     }
1077     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1078       SPLITMUX_PART_LOCK (reader);
1079       g_object_set (reader->src, "location", reader->path, NULL);
1080       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1081       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1082       reader->running = TRUE;
1083       SPLITMUX_PART_UNLOCK (reader);
1084 
1085       /* we go to PAUSED asynchronously once all streams have been collected
1086        * and seeks to measure the stream lengths are done */
1087       do_async_start (reader);
1088       break;
1089     }
1090     case GST_STATE_CHANGE_READY_TO_NULL:
1091     case GST_STATE_CHANGE_PAUSED_TO_READY:
1092       SPLITMUX_PART_LOCK (reader);
1093       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1094       reader->running = FALSE;
1095       SPLITMUX_PART_BROADCAST (reader);
1096       SPLITMUX_PART_UNLOCK (reader);
1097       break;
1098     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1099       SPLITMUX_PART_LOCK (reader);
1100       reader->active = FALSE;
1101       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1102       SPLITMUX_PART_BROADCAST (reader);
1103       SPLITMUX_PART_UNLOCK (reader);
1104       break;
1105     default:
1106       break;
1107   }
1108 
1109   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1110   if (ret == GST_STATE_CHANGE_FAILURE) {
1111     do_async_done (reader);
1112     goto beach;
1113   }
1114 
1115   switch (transition) {
1116     case GST_STATE_CHANGE_READY_TO_PAUSED:
1117       ret = GST_STATE_CHANGE_ASYNC;
1118       break;
1119     case GST_STATE_CHANGE_PAUSED_TO_READY:
1120       do_async_done (reader);
1121       break;
1122     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1123       SPLITMUX_PART_LOCK (reader);
1124       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1125       reader->active = TRUE;
1126       SPLITMUX_PART_BROADCAST (reader);
1127       SPLITMUX_PART_UNLOCK (reader);
1128       break;
1129     case GST_STATE_CHANGE_READY_TO_NULL:
1130       reader->prep_state = PART_STATE_NULL;
1131       splitmux_part_reader_reset (reader);
1132       break;
1133     default:
1134       break;
1135   }
1136 
1137 beach:
1138   return ret;
1139 }
1140 
1141 gboolean
gst_splitmux_part_reader_prepare(GstSplitMuxPartReader * part)1142 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1143 {
1144   GstStateChangeReturn ret;
1145 
1146   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1147 
1148   if (ret == GST_STATE_CHANGE_FAILURE)
1149     return FALSE;
1150 
1151   return TRUE;
1152 }
1153 
1154 void
gst_splitmux_part_reader_unprepare(GstSplitMuxPartReader * part)1155 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1156 {
1157   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1158 }
1159 
1160 void
gst_splitmux_part_reader_set_location(GstSplitMuxPartReader * reader,const gchar * path)1161 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1162     const gchar * path)
1163 {
1164   reader->path = g_strdup (path);
1165 }
1166 
1167 gboolean
gst_splitmux_part_reader_activate(GstSplitMuxPartReader * reader,GstSegment * seg,GstSeekFlags extra_flags)1168 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1169     GstSegment * seg, GstSeekFlags extra_flags)
1170 {
1171   GST_DEBUG_OBJECT (reader, "Activating part reader");
1172 
1173   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
1174     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1175         seg);
1176     return FALSE;
1177   }
1178   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1179           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1180     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1181     return FALSE;
1182   }
1183   return TRUE;
1184 }
1185 
1186 gboolean
gst_splitmux_part_reader_is_active(GstSplitMuxPartReader * part)1187 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1188 {
1189   gboolean ret;
1190 
1191   SPLITMUX_PART_LOCK (part);
1192   ret = part->active;
1193   SPLITMUX_PART_UNLOCK (part);
1194 
1195   return ret;
1196 }
1197 
1198 void
gst_splitmux_part_reader_deactivate(GstSplitMuxPartReader * reader)1199 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1200 {
1201   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1202   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1203 }
1204 
1205 void
gst_splitmux_part_reader_set_flushing_locked(GstSplitMuxPartReader * reader,gboolean flushing)1206 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1207     gboolean flushing)
1208 {
1209   GList *cur;
1210 
1211   GST_LOG_OBJECT (reader, "%s dataqueues",
1212       flushing ? "Flushing" : "Done flushing");
1213   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1214     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1215     gst_data_queue_set_flushing (part_pad->queue, flushing);
1216     if (flushing)
1217       gst_data_queue_flush (part_pad->queue);
1218   }
1219 };
1220 
1221 void
gst_splitmux_part_reader_set_callbacks(GstSplitMuxPartReader * reader,gpointer cb_data,GstSplitMuxPartReaderPadCb get_pad_cb)1222 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1223     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1224 {
1225   reader->cb_data = cb_data;
1226   reader->get_pad_cb = get_pad_cb;
1227 }
1228 
1229 GstClockTime
gst_splitmux_part_reader_get_end_offset(GstSplitMuxPartReader * reader)1230 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1231 {
1232   GList *cur;
1233   GstClockTime ret = GST_CLOCK_TIME_NONE;
1234 
1235   SPLITMUX_PART_LOCK (reader);
1236   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1237     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1238     if (!part_pad->is_sparse && part_pad->max_ts < ret)
1239       ret = part_pad->max_ts;
1240   }
1241 
1242   SPLITMUX_PART_UNLOCK (reader);
1243 
1244   return ret;
1245 }
1246 
1247 void
gst_splitmux_part_reader_set_start_offset(GstSplitMuxPartReader * reader,GstClockTime offset)1248 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1249     GstClockTime offset)
1250 {
1251   SPLITMUX_PART_LOCK (reader);
1252   reader->start_offset = offset;
1253   GST_INFO_OBJECT (reader, "TS offset now %" GST_TIME_FORMAT,
1254       GST_TIME_ARGS (offset));
1255   SPLITMUX_PART_UNLOCK (reader);
1256 }
1257 
1258 GstClockTime
gst_splitmux_part_reader_get_start_offset(GstSplitMuxPartReader * reader)1259 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1260 {
1261   GstClockTime ret = GST_CLOCK_TIME_NONE;
1262 
1263   SPLITMUX_PART_LOCK (reader);
1264   ret = reader->start_offset;
1265   SPLITMUX_PART_UNLOCK (reader);
1266 
1267   return ret;
1268 }
1269 
1270 GstClockTime
gst_splitmux_part_reader_get_duration(GstSplitMuxPartReader * reader)1271 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1272 {
1273   GstClockTime dur;
1274 
1275   SPLITMUX_PART_LOCK (reader);
1276   dur = reader->duration;
1277   SPLITMUX_PART_UNLOCK (reader);
1278 
1279   return dur;
1280 }
1281 
1282 GstPad *
gst_splitmux_part_reader_lookup_pad(GstSplitMuxPartReader * reader,GstPad * target)1283 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1284     GstPad * target)
1285 {
1286   GstPad *result = NULL;
1287   GList *cur;
1288 
1289   SPLITMUX_PART_LOCK (reader);
1290   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1291     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1292     if (part_pad->target == target) {
1293       result = (GstPad *) gst_object_ref (part_pad);
1294       break;
1295     }
1296   }
1297   SPLITMUX_PART_UNLOCK (reader);
1298 
1299   return result;
1300 }
1301 
1302 GstFlowReturn
gst_splitmux_part_reader_pop(GstSplitMuxPartReader * reader,GstPad * pad,GstDataQueueItem ** item)1303 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1304     GstDataQueueItem ** item)
1305 {
1306   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1307   GstDataQueue *q;
1308   GstFlowReturn ret;
1309 
1310   /* Get one item from the appropriate dataqueue */
1311   SPLITMUX_PART_LOCK (reader);
1312   if (reader->prep_state == PART_STATE_FAILED) {
1313     SPLITMUX_PART_UNLOCK (reader);
1314     return GST_FLOW_ERROR;
1315   }
1316 
1317   q = gst_object_ref (part_pad->queue);
1318 
1319   /* Have to drop the lock around pop, so we can be woken up for flush */
1320   SPLITMUX_PART_UNLOCK (reader);
1321   if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1322     ret = GST_FLOW_FLUSHING;
1323     goto out;
1324   }
1325 
1326   SPLITMUX_PART_LOCK (reader);
1327 
1328   SPLITMUX_PART_BROADCAST (reader);
1329   if (GST_IS_EVENT ((*item)->object)) {
1330     GstEvent *e = (GstEvent *) ((*item)->object);
1331     /* Mark this pad as EOS */
1332     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1333       part_pad->is_eos = TRUE;
1334   }
1335 
1336   SPLITMUX_PART_UNLOCK (reader);
1337 
1338   ret = GST_FLOW_OK;
1339 out:
1340   gst_object_unref (q);
1341   return ret;
1342 }
1343 
1344 static void
bus_handler(GstBin * bin,GstMessage * message)1345 bus_handler (GstBin * bin, GstMessage * message)
1346 {
1347   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1348 
1349   switch (GST_MESSAGE_TYPE (message)) {
1350     case GST_MESSAGE_ERROR:
1351       /* Make sure to set the state to failed and wake up the listener
1352        * on error */
1353       SPLITMUX_PART_LOCK (reader);
1354       GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
1355           " marking this reader as failed", GST_MESSAGE_SRC (message));
1356       reader->prep_state = PART_STATE_FAILED;
1357       SPLITMUX_PART_BROADCAST (reader);
1358       SPLITMUX_PART_UNLOCK (reader);
1359       do_async_done (reader);
1360       break;
1361     default:
1362       break;
1363   }
1364 
1365   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1366 }
1367