• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer Split Demuxer bin that recombines files created by
2  * the splitmuxsink element.
3  *
4  * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 
26 #include <string.h>
27 #include "gstsplitmuxsrc.h"
28 
29 GST_DEBUG_CATEGORY_STATIC (splitmux_part_debug);
30 #define GST_CAT_DEFAULT splitmux_part_debug
31 
32 #define SPLITMUX_PART_LOCK(p) g_mutex_lock(&(p)->lock)
33 #define SPLITMUX_PART_UNLOCK(p) g_mutex_unlock(&(p)->lock)
34 #define SPLITMUX_PART_WAIT(p) g_cond_wait (&(p)->inactive_cond, &(p)->lock)
35 #define SPLITMUX_PART_BROADCAST(p) g_cond_broadcast (&(p)->inactive_cond)
36 
37 #define SPLITMUX_PART_TYPE_LOCK(p) g_mutex_lock(&(p)->type_lock)
38 #define SPLITMUX_PART_TYPE_UNLOCK(p) g_mutex_unlock(&(p)->type_lock)
39 
40 #define SPLITMUX_PART_MSG_LOCK(p) g_mutex_lock(&(p)->msg_lock)
41 #define SPLITMUX_PART_MSG_UNLOCK(p) g_mutex_unlock(&(p)->msg_lock)
42 
43 typedef struct _GstSplitMuxPartPad
44 {
45   GstPad parent;
46 
47   /* Reader we belong to */
48   GstSplitMuxPartReader *reader;
49   /* Output splitmuxsrc source pad */
50   GstPad *target;
51 
52   GstDataQueue *queue;
53 
54   gboolean is_eos;
55   gboolean flushing;
56   gboolean seen_buffer;
57 
58   gboolean is_sparse;
59   GstClockTime max_ts;
60   GstSegment segment;
61 
62   GstSegment orig_segment;
63   GstClockTime initial_ts_offset;
64 } GstSplitMuxPartPad;
65 
66 typedef struct _GstSplitMuxPartPadClass
67 {
68   GstPadClass parent;
69 } GstSplitMuxPartPadClass;
70 
71 static GType gst_splitmux_part_pad_get_type (void);
72 #define SPLITMUX_TYPE_PART_PAD gst_splitmux_part_pad_get_type()
73 #define SPLITMUX_PART_PAD_CAST(p) ((GstSplitMuxPartPad *)(p))
74 
75 static void splitmux_part_pad_constructed (GObject * pad);
76 static void splitmux_part_pad_finalize (GObject * pad);
77 static void handle_buffer_measuring (GstSplitMuxPartReader * reader,
78     GstSplitMuxPartPad * part_pad, GstBuffer * buf);
79 
80 static gboolean splitmux_data_queue_is_full_cb (GstDataQueue * queue,
81     guint visible, guint bytes, guint64 time, gpointer checkdata);
82 static void type_found (GstElement * typefind, guint probability,
83     GstCaps * caps, GstSplitMuxPartReader * reader);
84 static void check_if_pads_collected (GstSplitMuxPartReader * reader);
85 
86 static void
87 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
88     reader);
89 
90 /* Called with reader lock held */
91 static gboolean
have_empty_queue(GstSplitMuxPartReader * reader)92 have_empty_queue (GstSplitMuxPartReader * reader)
93 {
94   GList *cur;
95 
96   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
97     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
98     if (part_pad->is_eos) {
99       GST_LOG_OBJECT (part_pad, "Pad is EOS");
100       return TRUE;
101     }
102     if (gst_data_queue_is_empty (part_pad->queue)) {
103       GST_LOG_OBJECT (part_pad, "Queue is empty");
104       return TRUE;
105     }
106   }
107 
108   return FALSE;
109 }
110 
111 /* Called with reader lock held */
112 static gboolean
block_until_can_push(GstSplitMuxPartReader * reader)113 block_until_can_push (GstSplitMuxPartReader * reader)
114 {
115   while (reader->running) {
116     if (reader->flushing)
117       goto out;
118     if (reader->active && have_empty_queue (reader))
119       goto out;
120 
121     GST_LOG_OBJECT (reader,
122         "Waiting for activation or empty queue on reader %s", reader->path);
123     SPLITMUX_PART_WAIT (reader);
124   }
125 
126   GST_LOG_OBJECT (reader, "Done waiting on reader %s active %d flushing %d",
127       reader->path, reader->active, reader->flushing);
128 out:
129   return reader->active && !reader->flushing;
130 }
131 
132 static void
handle_buffer_measuring(GstSplitMuxPartReader * reader,GstSplitMuxPartPad * part_pad,GstBuffer * buf)133 handle_buffer_measuring (GstSplitMuxPartReader * reader,
134     GstSplitMuxPartPad * part_pad, GstBuffer * buf)
135 {
136   GstClockTimeDiff ts = GST_CLOCK_STIME_NONE;
137   GstClockTimeDiff offset;
138 
139   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS &&
140       !part_pad->seen_buffer) {
141     /* If this is the first buffer on the pad in the collect_streams state,
142      * then calculate initial offset based on running time of this segment */
143     part_pad->initial_ts_offset =
144         part_pad->orig_segment.start + part_pad->orig_segment.base -
145         part_pad->orig_segment.time;
146     GST_DEBUG_OBJECT (reader,
147         "Initial TS offset for pad %" GST_PTR_FORMAT " now %" GST_TIME_FORMAT,
148         part_pad, GST_TIME_ARGS (part_pad->initial_ts_offset));
149   }
150   part_pad->seen_buffer = TRUE;
151 
152   /* Adjust buffer timestamps */
153   offset = reader->start_offset + part_pad->segment.base;
154   offset -= part_pad->initial_ts_offset;
155   /* We don't add the ts_offset here, because we
156    * want to measure the logical length of the stream,
157    * not to generate output timestamps */
158 
159   /* Update the stored max duration on the pad,
160    * always preferring making DTS contiguous
161    * where possible */
162   if (GST_BUFFER_DTS_IS_VALID (buf))
163     ts = GST_BUFFER_DTS (buf) + offset;
164   else if (GST_BUFFER_PTS_IS_VALID (buf))
165     ts = GST_BUFFER_PTS (buf) + offset;
166 
167   GST_DEBUG_OBJECT (reader, "Pad %" GST_PTR_FORMAT
168       " incoming DTS %" GST_TIME_FORMAT
169       " PTS %" GST_TIME_FORMAT " offset by %" GST_STIME_FORMAT
170       " to %" GST_STIME_FORMAT, part_pad,
171       GST_TIME_ARGS (GST_BUFFER_DTS (buf)),
172       GST_TIME_ARGS (GST_BUFFER_PTS (buf)),
173       GST_STIME_ARGS (offset), GST_STIME_ARGS (ts));
174 
175   if (GST_CLOCK_STIME_IS_VALID (ts)) {
176     if (GST_BUFFER_DURATION_IS_VALID (buf))
177       ts += GST_BUFFER_DURATION (buf);
178 
179     if (GST_CLOCK_STIME_IS_VALID (ts)
180         && ts > (GstClockTimeDiff) part_pad->max_ts) {
181       part_pad->max_ts = ts;
182       GST_LOG_OBJECT (reader,
183           "pad %" GST_PTR_FORMAT " max TS now %" GST_TIME_FORMAT, part_pad,
184           GST_TIME_ARGS (part_pad->max_ts));
185     }
186   }
187   /* Is it time to move to measuring state yet? */
188   check_if_pads_collected (reader);
189 }
190 
191 static gboolean
splitmux_data_queue_is_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer checkdata)192 splitmux_data_queue_is_full_cb (GstDataQueue * queue,
193     guint visible, guint bytes, guint64 time, gpointer checkdata)
194 {
195   /* Arbitrary safety limit. If we hit it, playback is likely to stall */
196   if (time > 20 * GST_SECOND)
197     return TRUE;
198   return FALSE;
199 }
200 
201 static void
splitmux_part_free_queue_item(GstDataQueueItem * item)202 splitmux_part_free_queue_item (GstDataQueueItem * item)
203 {
204   gst_mini_object_unref (item->object);
205   g_slice_free (GstDataQueueItem, item);
206 }
207 
208 static GstFlowReturn
splitmux_part_pad_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)209 splitmux_part_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
210 {
211   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
212   GstSplitMuxPartReader *reader = part_pad->reader;
213   GstDataQueueItem *item;
214   GstClockTimeDiff offset;
215 
216   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " %" GST_PTR_FORMAT, pad, buf);
217   SPLITMUX_PART_LOCK (reader);
218 
219   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
220       reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
221     handle_buffer_measuring (reader, part_pad, buf);
222     gst_buffer_unref (buf);
223     SPLITMUX_PART_UNLOCK (reader);
224     return GST_FLOW_OK;
225   }
226 
227   if (!block_until_can_push (reader)) {
228     /* Flushing */
229     SPLITMUX_PART_UNLOCK (reader);
230     gst_buffer_unref (buf);
231     return GST_FLOW_FLUSHING;
232   }
233 
234   /* Adjust buffer timestamps */
235   offset = reader->start_offset + part_pad->segment.base;
236   offset -= part_pad->initial_ts_offset;
237   offset += reader->ts_offset;
238 
239   if (GST_BUFFER_PTS_IS_VALID (buf))
240     GST_BUFFER_PTS (buf) += offset;
241   if (GST_BUFFER_DTS_IS_VALID (buf))
242     GST_BUFFER_DTS (buf) += offset;
243 
244   /* We are active, and one queue is empty, place this buffer in
245    * the dataqueue */
246   GST_LOG_OBJECT (reader, "Enqueueing buffer %" GST_PTR_FORMAT, buf);
247   item = g_slice_new (GstDataQueueItem);
248   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
249   item->object = GST_MINI_OBJECT (buf);
250   item->size = gst_buffer_get_size (buf);
251   item->duration = GST_BUFFER_DURATION (buf);
252   if (item->duration == GST_CLOCK_TIME_NONE)
253     item->duration = 0;
254   item->visible = TRUE;
255 
256   gst_object_ref (part_pad);
257 
258   SPLITMUX_PART_UNLOCK (reader);
259 
260   if (!gst_data_queue_push (part_pad->queue, item)) {
261     splitmux_part_free_queue_item (item);
262     gst_object_unref (part_pad);
263     return GST_FLOW_FLUSHING;
264   }
265 
266   gst_object_unref (part_pad);
267   return GST_FLOW_OK;
268 }
269 
270 /* Called with splitmux part lock held */
271 static gboolean
splitmux_part_is_eos_locked(GstSplitMuxPartReader * part)272 splitmux_part_is_eos_locked (GstSplitMuxPartReader * part)
273 {
274   GList *cur;
275   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
276     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
277     if (!part_pad->is_eos)
278       return FALSE;
279   }
280 
281   return TRUE;
282 }
283 
284 static gboolean
splitmux_part_is_prerolled_locked(GstSplitMuxPartReader * part)285 splitmux_part_is_prerolled_locked (GstSplitMuxPartReader * part)
286 {
287   GList *cur;
288   GST_LOG_OBJECT (part, "Checking for preroll");
289   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
290     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
291     if (!part_pad->seen_buffer) {
292       GST_LOG_OBJECT (part, "Part pad %" GST_PTR_FORMAT " is not prerolled",
293           part_pad);
294       return FALSE;
295     }
296   }
297   GST_LOG_OBJECT (part, "Part is prerolled");
298   return TRUE;
299 }
300 
301 
302 gboolean
gst_splitmux_part_is_eos(GstSplitMuxPartReader * reader)303 gst_splitmux_part_is_eos (GstSplitMuxPartReader * reader)
304 {
305   gboolean res;
306 
307   SPLITMUX_PART_LOCK (reader);
308   res = splitmux_part_is_eos_locked (reader);
309   SPLITMUX_PART_UNLOCK (reader);
310 
311   return res;
312 }
313 
314 /* Called with splitmux part lock held */
315 static gboolean
splitmux_is_flushing(GstSplitMuxPartReader * reader)316 splitmux_is_flushing (GstSplitMuxPartReader * reader)
317 {
318   GList *cur;
319   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
320     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
321     if (part_pad->flushing)
322       return TRUE;
323   }
324 
325   return FALSE;
326 }
327 
328 static gboolean
splitmux_part_pad_event(GstPad * pad,GstObject * parent,GstEvent * event)329 splitmux_part_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
330 {
331   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
332   GstSplitMuxPartReader *reader = part_pad->reader;
333   gboolean ret = TRUE;
334   SplitMuxSrcPad *target;
335   GstDataQueueItem *item;
336 
337   SPLITMUX_PART_LOCK (reader);
338 
339   target = gst_object_ref (part_pad->target);
340 
341   GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " event %" GST_PTR_FORMAT, pad,
342       event);
343 
344   if (part_pad->flushing && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
345     goto drop_event;
346 
347   switch (GST_EVENT_TYPE (event)) {
348     case GST_EVENT_STREAM_START:{
349       GstStreamFlags flags;
350       gst_event_parse_stream_flags (event, &flags);
351       part_pad->is_sparse = (flags & GST_STREAM_FLAG_SPARSE);
352       break;
353     }
354     case GST_EVENT_SEGMENT:{
355       GstSegment *seg = &part_pad->segment;
356 
357       GST_LOG_OBJECT (pad, "Received segment %" GST_PTR_FORMAT, event);
358 
359       gst_event_copy_segment (event, seg);
360       gst_event_copy_segment (event, &part_pad->orig_segment);
361 
362       if (seg->format != GST_FORMAT_TIME)
363         goto wrong_segment;
364 
365       /* Adjust segment */
366       /* Adjust start/stop so the overall file is 0 + start_offset based,
367        * adding a fixed offset so that DTS is never negative */
368       if (seg->stop != -1) {
369         seg->stop -= seg->start;
370         seg->stop += seg->time + reader->start_offset + reader->ts_offset;
371       }
372       seg->start = seg->time + reader->start_offset + reader->ts_offset;
373       seg->time += reader->start_offset;
374       seg->position += reader->start_offset;
375 
376       /* Replace event */
377       gst_event_unref (event);
378       event = gst_event_new_segment (seg);
379 
380       GST_LOG_OBJECT (pad, "Adjusted segment now %" GST_PTR_FORMAT, event);
381 
382       if (reader->prep_state != PART_STATE_PREPARING_COLLECT_STREAMS
383           && reader->prep_state != PART_STATE_PREPARING_MEASURE_STREAMS)
384         break;                  /* Only do further stuff with segments during initial measuring */
385 
386       /* Take the first segment from the first part */
387       if (target->segment.format == GST_FORMAT_UNDEFINED) {
388         gst_segment_copy_into (seg, &target->segment);
389         GST_DEBUG_OBJECT (reader,
390             "Target pad segment now %" GST_SEGMENT_FORMAT, &target->segment);
391       }
392 
393       if (seg->stop != -1 && target->segment.stop != -1) {
394         GstClockTime stop = seg->base + seg->stop;
395         if (stop > target->segment.stop) {
396           target->segment.stop = stop;
397           GST_DEBUG_OBJECT (reader,
398               "Adjusting segment stop by %" GST_TIME_FORMAT
399               " output now %" GST_SEGMENT_FORMAT,
400               GST_TIME_ARGS (reader->start_offset), &target->segment);
401         }
402       }
403       GST_LOG_OBJECT (pad, "Forwarding segment %" GST_PTR_FORMAT, event);
404       break;
405     }
406     case GST_EVENT_EOS:{
407 
408       GST_DEBUG_OBJECT (part_pad,
409           "State %u EOS event. MaxTS seen %" GST_TIME_FORMAT,
410           reader->prep_state, GST_TIME_ARGS (part_pad->max_ts));
411 
412       if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS ||
413           reader->prep_state == PART_STATE_PREPARING_MEASURE_STREAMS) {
414         /* Mark this pad as EOS */
415         part_pad->is_eos = TRUE;
416         if (splitmux_part_is_eos_locked (reader)) {
417           /* Finished measuring things, set state and tell the state change func
418            * so it can seek back to the start */
419           GST_LOG_OBJECT (reader,
420               "EOS while measuring streams. Resetting for ready");
421           reader->prep_state = PART_STATE_PREPARING_RESET_FOR_READY;
422 
423           gst_element_call_async (GST_ELEMENT_CAST (reader),
424               (GstElementCallAsyncFunc)
425               gst_splitmux_part_reader_finish_measuring_streams, NULL, NULL);
426         }
427         goto drop_event;
428       }
429       break;
430     }
431     case GST_EVENT_FLUSH_START:
432       reader->flushing = TRUE;
433       part_pad->flushing = TRUE;
434       GST_LOG_OBJECT (reader, "Pad %" GST_PTR_FORMAT " flushing dataqueue",
435           part_pad);
436       gst_data_queue_set_flushing (part_pad->queue, TRUE);
437       SPLITMUX_PART_BROADCAST (reader);
438       break;
439     case GST_EVENT_FLUSH_STOP:{
440       gst_data_queue_set_flushing (part_pad->queue, FALSE);
441       gst_data_queue_flush (part_pad->queue);
442       part_pad->seen_buffer = FALSE;
443       part_pad->flushing = FALSE;
444       part_pad->is_eos = FALSE;
445 
446       reader->flushing = splitmux_is_flushing (reader);
447       GST_LOG_OBJECT (reader,
448           "%s pad %" GST_PTR_FORMAT " flush_stop. Overall flushing=%d",
449           reader->path, pad, reader->flushing);
450       SPLITMUX_PART_BROADCAST (reader);
451       break;
452     }
453     default:
454       break;
455   }
456 
457   /* Don't send events downstream while preparing */
458   if (reader->prep_state != PART_STATE_READY)
459     goto drop_event;
460 
461   /* Don't pass flush events - those are done by the parent */
462   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START ||
463       GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
464     goto drop_event;
465 
466   if (!block_until_can_push (reader))
467     goto drop_event;
468 
469   switch (GST_EVENT_TYPE (event)) {
470     case GST_EVENT_GAP:{
471       /* FIXME: Drop initial gap (if any) in each segment, not all GAPs */
472       goto drop_event;
473     }
474     default:
475       break;
476   }
477 
478   /* We are active, and one queue is empty, place this buffer in
479    * the dataqueue */
480   gst_object_ref (part_pad->queue);
481   SPLITMUX_PART_UNLOCK (reader);
482 
483   GST_LOG_OBJECT (reader, "Enqueueing event %" GST_PTR_FORMAT, event);
484   item = g_slice_new (GstDataQueueItem);
485   item->destroy = (GDestroyNotify) splitmux_part_free_queue_item;
486   item->object = GST_MINI_OBJECT (event);
487   item->size = 0;
488   item->duration = 0;
489   if (item->duration == GST_CLOCK_TIME_NONE)
490     item->duration = 0;
491   item->visible = FALSE;
492 
493   if (!gst_data_queue_push (part_pad->queue, item)) {
494     splitmux_part_free_queue_item (item);
495     ret = FALSE;
496   }
497 
498   gst_object_unref (part_pad->queue);
499   gst_object_unref (target);
500 
501   return ret;
502 wrong_segment:
503   gst_event_unref (event);
504   gst_object_unref (target);
505   SPLITMUX_PART_UNLOCK (reader);
506   GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
507       ("Received non-time segment - reader %s pad %" GST_PTR_FORMAT,
508           reader->path, pad));
509   return FALSE;
510 drop_event:
511   GST_LOG_OBJECT (pad, "Dropping event %" GST_PTR_FORMAT
512       " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, event, pad, target);
513   gst_event_unref (event);
514   gst_object_unref (target);
515   SPLITMUX_PART_UNLOCK (reader);
516   return TRUE;
517 }
518 
519 static gboolean
splitmux_part_pad_query(GstPad * pad,GstObject * parent,GstQuery * query)520 splitmux_part_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
521 {
522   GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (pad);
523   GstSplitMuxPartReader *reader = part_pad->reader;
524   GstPad *target;
525   gboolean ret = FALSE;
526   gboolean active;
527 
528   SPLITMUX_PART_LOCK (reader);
529   target = gst_object_ref (part_pad->target);
530   active = reader->active;
531   SPLITMUX_PART_UNLOCK (reader);
532 
533   if (active) {
534     GST_LOG_OBJECT (pad, "Forwarding query %" GST_PTR_FORMAT
535         " from %" GST_PTR_FORMAT " on %" GST_PTR_FORMAT, query, pad, target);
536 
537     ret = gst_pad_query (target, query);
538   }
539 
540   gst_object_unref (target);
541 
542   return ret;
543 }
544 
545 G_DEFINE_TYPE (GstSplitMuxPartPad, gst_splitmux_part_pad, GST_TYPE_PAD);
546 
547 static void
splitmux_part_pad_constructed(GObject * pad)548 splitmux_part_pad_constructed (GObject * pad)
549 {
550   gst_pad_set_chain_function (GST_PAD (pad),
551       GST_DEBUG_FUNCPTR (splitmux_part_pad_chain));
552   gst_pad_set_event_function (GST_PAD (pad),
553       GST_DEBUG_FUNCPTR (splitmux_part_pad_event));
554   gst_pad_set_query_function (GST_PAD (pad),
555       GST_DEBUG_FUNCPTR (splitmux_part_pad_query));
556 
557   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->constructed (pad);
558 }
559 
560 static void
gst_splitmux_part_pad_class_init(GstSplitMuxPartPadClass * klass)561 gst_splitmux_part_pad_class_init (GstSplitMuxPartPadClass * klass)
562 {
563   GObjectClass *gobject_klass = (GObjectClass *) (klass);
564 
565   gobject_klass->constructed = splitmux_part_pad_constructed;
566   gobject_klass->finalize = splitmux_part_pad_finalize;
567 }
568 
569 static void
gst_splitmux_part_pad_init(GstSplitMuxPartPad * pad)570 gst_splitmux_part_pad_init (GstSplitMuxPartPad * pad)
571 {
572   pad->queue = gst_data_queue_new (splitmux_data_queue_is_full_cb,
573       NULL, NULL, pad);
574   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
575   gst_segment_init (&pad->orig_segment, GST_FORMAT_UNDEFINED);
576 }
577 
578 static void
splitmux_part_pad_finalize(GObject * obj)579 splitmux_part_pad_finalize (GObject * obj)
580 {
581   GstSplitMuxPartPad *pad = (GstSplitMuxPartPad *) (obj);
582 
583   GST_DEBUG_OBJECT (obj, "finalize");
584   gst_data_queue_set_flushing (pad->queue, TRUE);
585   gst_data_queue_flush (pad->queue);
586   gst_object_unref (GST_OBJECT_CAST (pad->queue));
587   pad->queue = NULL;
588 
589   G_OBJECT_CLASS (gst_splitmux_part_pad_parent_class)->finalize (obj);
590 }
591 
592 static void
593 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
594     GstSplitMuxPartReader * part);
595 static void no_more_pads (GstElement * element, GstSplitMuxPartReader * reader);
596 static GstStateChangeReturn
597 gst_splitmux_part_reader_change_state (GstElement * element,
598     GstStateChange transition);
599 static gboolean gst_splitmux_part_reader_send_event (GstElement * element,
600     GstEvent * event);
601 static void gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader
602     * part, gboolean flushing);
603 static void bus_handler (GstBin * bin, GstMessage * msg);
604 static void splitmux_part_reader_dispose (GObject * object);
605 static void splitmux_part_reader_finalize (GObject * object);
606 static void splitmux_part_reader_reset (GstSplitMuxPartReader * reader);
607 
608 #define gst_splitmux_part_reader_parent_class parent_class
609 G_DEFINE_TYPE (GstSplitMuxPartReader, gst_splitmux_part_reader,
610     GST_TYPE_PIPELINE);
611 
612 static void
gst_splitmux_part_reader_class_init(GstSplitMuxPartReaderClass * klass)613 gst_splitmux_part_reader_class_init (GstSplitMuxPartReaderClass * klass)
614 {
615   GObjectClass *gobject_klass = (GObjectClass *) (klass);
616   GstElementClass *gstelement_class = (GstElementClass *) klass;
617   GstBinClass *gstbin_class = (GstBinClass *) klass;
618 
619   GST_DEBUG_CATEGORY_INIT (splitmux_part_debug, "splitmuxpartreader", 0,
620       "Split File Demuxing Source helper");
621 
622   gobject_klass->dispose = splitmux_part_reader_dispose;
623   gobject_klass->finalize = splitmux_part_reader_finalize;
624 
625   gstelement_class->change_state = gst_splitmux_part_reader_change_state;
626   gstelement_class->send_event = gst_splitmux_part_reader_send_event;
627 
628   gstbin_class->handle_message = bus_handler;
629 }
630 
631 static void
gst_splitmux_part_reader_init(GstSplitMuxPartReader * reader)632 gst_splitmux_part_reader_init (GstSplitMuxPartReader * reader)
633 {
634   GstElement *typefind;
635 
636   reader->active = FALSE;
637   reader->duration = GST_CLOCK_TIME_NONE;
638 
639   g_cond_init (&reader->inactive_cond);
640   g_mutex_init (&reader->lock);
641   g_mutex_init (&reader->type_lock);
642   g_mutex_init (&reader->msg_lock);
643 
644   /* FIXME: Create elements on a state change */
645   reader->src = gst_element_factory_make ("filesrc", NULL);
646   if (reader->src == NULL) {
647     GST_ERROR_OBJECT (reader, "Failed to create filesrc element");
648     return;
649   }
650   gst_bin_add (GST_BIN_CAST (reader), reader->src);
651 
652   typefind = gst_element_factory_make ("typefind", NULL);
653   if (!typefind) {
654     GST_ERROR_OBJECT (reader,
655         "Failed to create typefind element - check your installation");
656     return;
657   }
658 
659   gst_bin_add (GST_BIN_CAST (reader), typefind);
660   reader->typefind = typefind;
661 
662   if (!gst_element_link_pads (reader->src, NULL, typefind, "sink")) {
663     GST_ERROR_OBJECT (reader,
664         "Failed to link typefind element - check your installation");
665     return;
666   }
667 
668   g_signal_connect (reader->typefind, "have-type", G_CALLBACK (type_found),
669       reader);
670 }
671 
672 static void
splitmux_part_reader_dispose(GObject * object)673 splitmux_part_reader_dispose (GObject * object)
674 {
675   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
676 
677   splitmux_part_reader_reset (reader);
678 
679   G_OBJECT_CLASS (parent_class)->dispose (object);
680 }
681 
682 static void
splitmux_part_reader_finalize(GObject * object)683 splitmux_part_reader_finalize (GObject * object)
684 {
685   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) object;
686 
687   g_cond_clear (&reader->inactive_cond);
688   g_mutex_clear (&reader->lock);
689   g_mutex_clear (&reader->type_lock);
690   g_mutex_clear (&reader->msg_lock);
691 
692   g_free (reader->path);
693 
694   G_OBJECT_CLASS (parent_class)->finalize (object);
695 }
696 
697 static void
do_async_start(GstSplitMuxPartReader * reader)698 do_async_start (GstSplitMuxPartReader * reader)
699 {
700   GstMessage *message;
701 
702   SPLITMUX_PART_MSG_LOCK (reader);
703   reader->async_pending = TRUE;
704 
705   message = gst_message_new_async_start (GST_OBJECT_CAST (reader));
706   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader), message);
707   SPLITMUX_PART_MSG_UNLOCK (reader);
708 }
709 
710 static void
do_async_done(GstSplitMuxPartReader * reader)711 do_async_done (GstSplitMuxPartReader * reader)
712 {
713   GstMessage *message;
714 
715   SPLITMUX_PART_MSG_LOCK (reader);
716   if (reader->async_pending) {
717     message =
718         gst_message_new_async_done (GST_OBJECT_CAST (reader),
719         GST_CLOCK_TIME_NONE);
720     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST (reader),
721         message);
722 
723     reader->async_pending = FALSE;
724   }
725   SPLITMUX_PART_MSG_UNLOCK (reader);
726 }
727 
728 static void
splitmux_part_reader_reset(GstSplitMuxPartReader * reader)729 splitmux_part_reader_reset (GstSplitMuxPartReader * reader)
730 {
731   GList *cur;
732 
733   SPLITMUX_PART_LOCK (reader);
734   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
735     GstPad *pad = GST_PAD_CAST (cur->data);
736     gst_pad_set_active (GST_PAD_CAST (pad), FALSE);
737     gst_object_unref (GST_OBJECT_CAST (pad));
738   }
739 
740   g_list_free (reader->pads);
741   reader->pads = NULL;
742   SPLITMUX_PART_UNLOCK (reader);
743 }
744 
745 static GstSplitMuxPartPad *
gst_splitmux_part_reader_new_proxy_pad(GstSplitMuxPartReader * reader,GstPad * target)746 gst_splitmux_part_reader_new_proxy_pad (GstSplitMuxPartReader * reader,
747     GstPad * target)
748 {
749   GstSplitMuxPartPad *pad = g_object_new (SPLITMUX_TYPE_PART_PAD,
750       "name", GST_PAD_NAME (target),
751       "direction", GST_PAD_SINK,
752       NULL);
753   pad->target = target;
754   pad->reader = reader;
755 
756   gst_pad_set_active (GST_PAD_CAST (pad), TRUE);
757 
758   return pad;
759 }
760 
761 static void
new_decoded_pad_added_cb(GstElement * element,GstPad * pad,GstSplitMuxPartReader * reader)762 new_decoded_pad_added_cb (GstElement * element, GstPad * pad,
763     GstSplitMuxPartReader * reader)
764 {
765   GstPad *out_pad = NULL;
766   GstSplitMuxPartPad *proxy_pad;
767   GstCaps *caps;
768   GstPadLinkReturn link_ret;
769 
770   caps = gst_pad_get_current_caps (pad);
771 
772   GST_DEBUG_OBJECT (reader, "file %s new decoded pad %" GST_PTR_FORMAT
773       " caps %" GST_PTR_FORMAT, reader->path, pad, caps);
774 
775   gst_caps_unref (caps);
776 
777   /* Look up or create the output pad */
778   if (reader->get_pad_cb)
779     out_pad = reader->get_pad_cb (reader, pad, reader->cb_data);
780   if (out_pad == NULL) {
781     GST_DEBUG_OBJECT (reader,
782         "No output pad for %" GST_PTR_FORMAT ". Ignoring", pad);
783     return;
784   }
785 
786   /* Create our proxy pad to interact with this new pad */
787   proxy_pad = gst_splitmux_part_reader_new_proxy_pad (reader, out_pad);
788   GST_DEBUG_OBJECT (reader,
789       "created proxy pad %" GST_PTR_FORMAT " for target %" GST_PTR_FORMAT,
790       proxy_pad, out_pad);
791 
792   link_ret = gst_pad_link (pad, GST_PAD (proxy_pad));
793   if (link_ret != GST_PAD_LINK_OK) {
794     gst_object_unref (proxy_pad);
795     GST_ELEMENT_ERROR (reader, STREAM, FAILED, (NULL),
796         ("Failed to link proxy pad for stream part %s pad %" GST_PTR_FORMAT
797             " ret %d", reader->path, pad, link_ret));
798     return;
799   }
800   GST_DEBUG_OBJECT (reader,
801       "new decoded pad %" GST_PTR_FORMAT " linked to %" GST_PTR_FORMAT,
802       pad, proxy_pad);
803 
804   SPLITMUX_PART_LOCK (reader);
805   reader->pads = g_list_prepend (reader->pads, proxy_pad);
806   SPLITMUX_PART_UNLOCK (reader);
807 }
808 
809 static gboolean
gst_splitmux_part_reader_send_event(GstElement * element,GstEvent * event)810 gst_splitmux_part_reader_send_event (GstElement * element, GstEvent * event)
811 {
812   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
813   gboolean ret = FALSE;
814   GstPad *pad = NULL;
815 
816   /* Send event to the first source pad we found */
817   SPLITMUX_PART_LOCK (reader);
818   if (reader->pads) {
819     GstPad *proxy_pad = GST_PAD_CAST (reader->pads->data);
820     pad = gst_pad_get_peer (proxy_pad);
821   }
822   SPLITMUX_PART_UNLOCK (reader);
823 
824   if (pad) {
825     ret = gst_pad_send_event (pad, event);
826     gst_object_unref (pad);
827   } else {
828     gst_event_unref (event);
829   }
830 
831   return ret;
832 }
833 
834 /* Called with lock held. Seeks to an 'internal' time from 0 to length of this piece */
835 static void
gst_splitmux_part_reader_seek_to_time_locked(GstSplitMuxPartReader * reader,GstClockTime time)836 gst_splitmux_part_reader_seek_to_time_locked (GstSplitMuxPartReader * reader,
837     GstClockTime time)
838 {
839   SPLITMUX_PART_UNLOCK (reader);
840   GST_DEBUG_OBJECT (reader, "Seeking to time %" GST_TIME_FORMAT,
841       GST_TIME_ARGS (time));
842   gst_element_seek (GST_ELEMENT_CAST (reader), 1.0, GST_FORMAT_TIME,
843       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, time,
844       GST_SEEK_TYPE_END, 0);
845 
846   SPLITMUX_PART_LOCK (reader);
847 
848   /* Wait for flush to finish, so old data is gone */
849   while (reader->flushing) {
850     GST_LOG_OBJECT (reader, "%s Waiting for flush to finish", reader->path);
851     SPLITMUX_PART_WAIT (reader);
852   }
853 }
854 
855 /* Map the passed segment to 'internal' time from 0 to length of this piece and seek. Lock cannot be held */
856 static gboolean
gst_splitmux_part_reader_seek_to_segment(GstSplitMuxPartReader * reader,GstSegment * target_seg,GstSeekFlags extra_flags)857 gst_splitmux_part_reader_seek_to_segment (GstSplitMuxPartReader * reader,
858     GstSegment * target_seg, GstSeekFlags extra_flags)
859 {
860   GstSeekFlags flags;
861   GstClockTime start = 0, stop = GST_CLOCK_TIME_NONE;
862 
863   flags = target_seg->flags | GST_SEEK_FLAG_FLUSH | extra_flags;
864 
865   SPLITMUX_PART_LOCK (reader);
866   if (target_seg->start >= reader->start_offset)
867     start = target_seg->start - reader->start_offset;
868   /* If the segment stop is within this part, don't play to the end */
869   if (target_seg->stop != -1 &&
870       target_seg->stop < reader->start_offset + reader->duration)
871     stop = target_seg->stop - reader->start_offset;
872 
873   SPLITMUX_PART_UNLOCK (reader);
874 
875   GST_DEBUG_OBJECT (reader,
876       "Seeking rate %f format %d flags 0x%x start %" GST_TIME_FORMAT " stop %"
877       GST_TIME_FORMAT, target_seg->rate, target_seg->format, flags,
878       GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
879 
880   return gst_element_seek (GST_ELEMENT_CAST (reader), target_seg->rate,
881       target_seg->format, flags, GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET,
882       stop);
883 }
884 
885 /* Called with lock held */
886 static void
gst_splitmux_part_reader_measure_streams(GstSplitMuxPartReader * reader)887 gst_splitmux_part_reader_measure_streams (GstSplitMuxPartReader * reader)
888 {
889   SPLITMUX_PART_LOCK (reader);
890   /* Trigger a flushing seek to near the end of the file and run each stream
891    * to EOS in order to find the smallest end timestamp to start the next
892    * file from
893    */
894   if (GST_CLOCK_TIME_IS_VALID (reader->duration)
895       && reader->duration > GST_SECOND) {
896     GstClockTime seek_ts = reader->duration - (0.5 * GST_SECOND);
897     gst_splitmux_part_reader_seek_to_time_locked (reader, seek_ts);
898   }
899   SPLITMUX_PART_UNLOCK (reader);
900 }
901 
902 static void
gst_splitmux_part_reader_finish_measuring_streams(GstSplitMuxPartReader * reader)903 gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
904     reader)
905 {
906   SPLITMUX_PART_LOCK (reader);
907   if (reader->prep_state == PART_STATE_PREPARING_RESET_FOR_READY) {
908     /* Fire the prepared signal and go to READY state */
909     GST_DEBUG_OBJECT (reader,
910         "Stream measuring complete. File %s is now ready", reader->path);
911     reader->prep_state = PART_STATE_READY;
912     SPLITMUX_PART_UNLOCK (reader);
913     do_async_done (reader);
914   } else {
915     SPLITMUX_PART_UNLOCK (reader);
916   }
917 }
918 
919 static GstElement *
find_demuxer(GstCaps * caps)920 find_demuxer (GstCaps * caps)
921 {
922   GList *factories =
923       gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DEMUXER,
924       GST_RANK_MARGINAL);
925   GList *compat_elements;
926   GstElement *e = NULL;
927 
928   if (factories == NULL)
929     return NULL;
930 
931   compat_elements =
932       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK, TRUE);
933 
934   if (compat_elements) {
935     /* Just take the first (highest ranked) option */
936     GstElementFactory *factory =
937         GST_ELEMENT_FACTORY_CAST (compat_elements->data);
938     e = gst_element_factory_create (factory, NULL);
939     gst_plugin_feature_list_free (compat_elements);
940   }
941 
942   if (factories)
943     gst_plugin_feature_list_free (factories);
944 
945   return e;
946 }
947 
948 static void
type_found(GstElement * typefind,guint probability,GstCaps * caps,GstSplitMuxPartReader * reader)949 type_found (GstElement * typefind, guint probability,
950     GstCaps * caps, GstSplitMuxPartReader * reader)
951 {
952   GstElement *demux;
953 
954   GST_INFO_OBJECT (reader, "Got type %" GST_PTR_FORMAT, caps);
955 
956   /* typefind found a type. Look for the demuxer to handle it */
957   demux = reader->demux = find_demuxer (caps);
958   if (reader->demux == NULL) {
959     GST_ERROR_OBJECT (reader, "Failed to create demuxer element");
960     return;
961   }
962 
963   /* Connect to demux signals */
964   g_signal_connect (demux,
965       "pad-added", G_CALLBACK (new_decoded_pad_added_cb), reader);
966   g_signal_connect (demux, "no-more-pads", G_CALLBACK (no_more_pads), reader);
967 
968   gst_element_set_locked_state (demux, TRUE);
969   gst_bin_add (GST_BIN_CAST (reader), demux);
970   gst_element_link_pads (reader->typefind, "src", demux, NULL);
971   gst_element_set_state (reader->demux, GST_STATE_TARGET (reader));
972   gst_element_set_locked_state (demux, FALSE);
973 }
974 
975 static void
check_if_pads_collected(GstSplitMuxPartReader * reader)976 check_if_pads_collected (GstSplitMuxPartReader * reader)
977 {
978   if (reader->prep_state == PART_STATE_PREPARING_COLLECT_STREAMS) {
979     /* Check we have all pads and each pad has seen a buffer */
980     if (reader->no_more_pads && splitmux_part_is_prerolled_locked (reader)) {
981       GST_DEBUG_OBJECT (reader,
982           "no more pads - file %s. Measuring stream length", reader->path);
983       reader->prep_state = PART_STATE_PREPARING_MEASURE_STREAMS;
984       gst_element_call_async (GST_ELEMENT_CAST (reader),
985           (GstElementCallAsyncFunc) gst_splitmux_part_reader_measure_streams,
986           NULL, NULL);
987     }
988   }
989 }
990 
991 static void
no_more_pads(GstElement * element,GstSplitMuxPartReader * reader)992 no_more_pads (GstElement * element, GstSplitMuxPartReader * reader)
993 {
994   GstClockTime duration = GST_CLOCK_TIME_NONE;
995   GList *cur;
996   /* Query the minimum duration of any pad in this piece and store it.
997    * FIXME: Only consider audio and video */
998   SPLITMUX_PART_LOCK (reader);
999   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1000     GstPad *target = GST_PAD_CAST (cur->data);
1001     if (target) {
1002       gint64 cur_duration;
1003       if (gst_pad_peer_query_duration (target, GST_FORMAT_TIME, &cur_duration)) {
1004         GST_INFO_OBJECT (reader,
1005             "file %s pad %" GST_PTR_FORMAT " duration %" GST_TIME_FORMAT,
1006             reader->path, target, GST_TIME_ARGS (cur_duration));
1007         if (cur_duration < duration)
1008           duration = cur_duration;
1009       }
1010     }
1011   }
1012   GST_INFO_OBJECT (reader, "file %s duration %" GST_TIME_FORMAT,
1013       reader->path, GST_TIME_ARGS (duration));
1014   reader->duration = (GstClockTime) duration;
1015 
1016   reader->no_more_pads = TRUE;
1017 
1018   check_if_pads_collected (reader);
1019   SPLITMUX_PART_UNLOCK (reader);
1020 }
1021 
1022 gboolean
gst_splitmux_part_reader_src_query(GstSplitMuxPartReader * part,GstPad * src_pad,GstQuery * query)1023 gst_splitmux_part_reader_src_query (GstSplitMuxPartReader * part,
1024     GstPad * src_pad, GstQuery * query)
1025 {
1026   GstPad *target = NULL;
1027   gboolean ret;
1028   GList *cur;
1029 
1030   SPLITMUX_PART_LOCK (part);
1031   /* Find the pad corresponding to the visible output target pad */
1032   for (cur = g_list_first (part->pads); cur != NULL; cur = g_list_next (cur)) {
1033     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1034     if (part_pad->target == src_pad) {
1035       target = gst_object_ref (GST_OBJECT_CAST (part_pad));
1036       break;
1037     }
1038   }
1039   SPLITMUX_PART_UNLOCK (part);
1040 
1041   if (target == NULL)
1042     return FALSE;
1043 
1044   ret = gst_pad_peer_query (target, query);
1045 
1046   if (ret == FALSE)
1047     goto out;
1048 
1049   /* Post-massaging of queries */
1050   switch (GST_QUERY_TYPE (query)) {
1051     case GST_QUERY_POSITION:{
1052       GstFormat fmt;
1053       gint64 position;
1054 
1055       gst_query_parse_position (query, &fmt, &position);
1056       if (fmt != GST_FORMAT_TIME)
1057         return FALSE;
1058       SPLITMUX_PART_LOCK (part);
1059       position += part->start_offset;
1060       GST_LOG_OBJECT (part, "Position %" GST_TIME_FORMAT,
1061           GST_TIME_ARGS (position));
1062       SPLITMUX_PART_UNLOCK (part);
1063 
1064       gst_query_set_position (query, fmt, position);
1065       break;
1066     }
1067     default:
1068       break;
1069   }
1070 
1071 out:
1072   gst_object_unref (target);
1073   return ret;
1074 }
1075 
1076 static GstStateChangeReturn
gst_splitmux_part_reader_change_state(GstElement * element,GstStateChange transition)1077 gst_splitmux_part_reader_change_state (GstElement * element,
1078     GstStateChange transition)
1079 {
1080   GstStateChangeReturn ret;
1081   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) element;
1082 
1083   switch (transition) {
1084     case GST_STATE_CHANGE_NULL_TO_READY:{
1085       break;
1086     }
1087     case GST_STATE_CHANGE_READY_TO_PAUSED:{
1088       SPLITMUX_PART_LOCK (reader);
1089       g_object_set (reader->src, "location", reader->path, NULL);
1090       reader->prep_state = PART_STATE_PREPARING_COLLECT_STREAMS;
1091       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1092       reader->running = TRUE;
1093       SPLITMUX_PART_UNLOCK (reader);
1094 
1095       /* we go to PAUSED asynchronously once all streams have been collected
1096        * and seeks to measure the stream lengths are done */
1097       do_async_start (reader);
1098       break;
1099     }
1100     case GST_STATE_CHANGE_READY_TO_NULL:
1101     case GST_STATE_CHANGE_PAUSED_TO_READY:
1102       SPLITMUX_PART_LOCK (reader);
1103       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1104       reader->running = FALSE;
1105       SPLITMUX_PART_BROADCAST (reader);
1106       SPLITMUX_PART_UNLOCK (reader);
1107       break;
1108     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1109       SPLITMUX_PART_LOCK (reader);
1110       reader->active = FALSE;
1111       gst_splitmux_part_reader_set_flushing_locked (reader, TRUE);
1112       SPLITMUX_PART_BROADCAST (reader);
1113       SPLITMUX_PART_UNLOCK (reader);
1114       break;
1115     default:
1116       break;
1117   }
1118 
1119   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1120   if (ret == GST_STATE_CHANGE_FAILURE) {
1121     do_async_done (reader);
1122     goto beach;
1123   }
1124 
1125   switch (transition) {
1126     case GST_STATE_CHANGE_READY_TO_PAUSED:
1127       ret = GST_STATE_CHANGE_ASYNC;
1128       break;
1129     case GST_STATE_CHANGE_PAUSED_TO_READY:
1130       do_async_done (reader);
1131       break;
1132     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1133       SPLITMUX_PART_LOCK (reader);
1134       gst_splitmux_part_reader_set_flushing_locked (reader, FALSE);
1135       reader->active = TRUE;
1136       SPLITMUX_PART_BROADCAST (reader);
1137       SPLITMUX_PART_UNLOCK (reader);
1138       break;
1139     case GST_STATE_CHANGE_READY_TO_NULL:
1140       reader->prep_state = PART_STATE_NULL;
1141       splitmux_part_reader_reset (reader);
1142       break;
1143     default:
1144       break;
1145   }
1146 
1147 beach:
1148   return ret;
1149 }
1150 
1151 gboolean
gst_splitmux_part_reader_prepare(GstSplitMuxPartReader * part)1152 gst_splitmux_part_reader_prepare (GstSplitMuxPartReader * part)
1153 {
1154   GstStateChangeReturn ret;
1155 
1156   ret = gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_PAUSED);
1157 
1158   if (ret == GST_STATE_CHANGE_FAILURE)
1159     return FALSE;
1160 
1161   return TRUE;
1162 }
1163 
1164 void
gst_splitmux_part_reader_unprepare(GstSplitMuxPartReader * part)1165 gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader * part)
1166 {
1167   gst_element_set_state (GST_ELEMENT_CAST (part), GST_STATE_NULL);
1168 }
1169 
1170 void
gst_splitmux_part_reader_set_location(GstSplitMuxPartReader * reader,const gchar * path)1171 gst_splitmux_part_reader_set_location (GstSplitMuxPartReader * reader,
1172     const gchar * path)
1173 {
1174   reader->path = g_strdup (path);
1175 }
1176 
1177 gboolean
gst_splitmux_part_reader_activate(GstSplitMuxPartReader * reader,GstSegment * seg,GstSeekFlags extra_flags)1178 gst_splitmux_part_reader_activate (GstSplitMuxPartReader * reader,
1179     GstSegment * seg, GstSeekFlags extra_flags)
1180 {
1181   GST_DEBUG_OBJECT (reader, "Activating part reader");
1182 
1183   if (!gst_splitmux_part_reader_seek_to_segment (reader, seg, extra_flags)) {
1184     GST_ERROR_OBJECT (reader, "Failed to seek part to %" GST_SEGMENT_FORMAT,
1185         seg);
1186     return FALSE;
1187   }
1188   if (gst_element_set_state (GST_ELEMENT_CAST (reader),
1189           GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
1190     GST_ERROR_OBJECT (reader, "Failed to set state to PLAYING");
1191     return FALSE;
1192   }
1193   return TRUE;
1194 }
1195 
1196 gboolean
gst_splitmux_part_reader_is_active(GstSplitMuxPartReader * part)1197 gst_splitmux_part_reader_is_active (GstSplitMuxPartReader * part)
1198 {
1199   gboolean ret;
1200 
1201   SPLITMUX_PART_LOCK (part);
1202   ret = part->active;
1203   SPLITMUX_PART_UNLOCK (part);
1204 
1205   return ret;
1206 }
1207 
1208 void
gst_splitmux_part_reader_deactivate(GstSplitMuxPartReader * reader)1209 gst_splitmux_part_reader_deactivate (GstSplitMuxPartReader * reader)
1210 {
1211   GST_DEBUG_OBJECT (reader, "Deactivating reader");
1212   gst_element_set_state (GST_ELEMENT_CAST (reader), GST_STATE_PAUSED);
1213 }
1214 
1215 void
gst_splitmux_part_reader_set_flushing_locked(GstSplitMuxPartReader * reader,gboolean flushing)1216 gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
1217     gboolean flushing)
1218 {
1219   GList *cur;
1220 
1221   GST_LOG_OBJECT (reader, "%s dataqueues",
1222       flushing ? "Flushing" : "Done flushing");
1223   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1224     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1225     gst_data_queue_set_flushing (part_pad->queue, flushing);
1226     if (flushing)
1227       gst_data_queue_flush (part_pad->queue);
1228   }
1229 };
1230 
1231 void
gst_splitmux_part_reader_set_callbacks(GstSplitMuxPartReader * reader,gpointer cb_data,GstSplitMuxPartReaderPadCb get_pad_cb)1232 gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
1233     gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb)
1234 {
1235   reader->cb_data = cb_data;
1236   reader->get_pad_cb = get_pad_cb;
1237 }
1238 
1239 GstClockTime
gst_splitmux_part_reader_get_end_offset(GstSplitMuxPartReader * reader)1240 gst_splitmux_part_reader_get_end_offset (GstSplitMuxPartReader * reader)
1241 {
1242   GList *cur;
1243   GstClockTime ret = GST_CLOCK_TIME_NONE;
1244 
1245   SPLITMUX_PART_LOCK (reader);
1246   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1247     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1248     if (!part_pad->is_sparse && part_pad->max_ts < ret)
1249       ret = part_pad->max_ts;
1250   }
1251 
1252   SPLITMUX_PART_UNLOCK (reader);
1253 
1254   return ret;
1255 }
1256 
1257 void
gst_splitmux_part_reader_set_start_offset(GstSplitMuxPartReader * reader,GstClockTime time_offset,GstClockTime ts_offset)1258 gst_splitmux_part_reader_set_start_offset (GstSplitMuxPartReader * reader,
1259     GstClockTime time_offset, GstClockTime ts_offset)
1260 {
1261   SPLITMUX_PART_LOCK (reader);
1262   reader->start_offset = time_offset;
1263   reader->ts_offset = ts_offset;
1264   GST_INFO_OBJECT (reader, "Time offset now %" GST_TIME_FORMAT,
1265       GST_TIME_ARGS (time_offset));
1266   SPLITMUX_PART_UNLOCK (reader);
1267 }
1268 
1269 GstClockTime
gst_splitmux_part_reader_get_start_offset(GstSplitMuxPartReader * reader)1270 gst_splitmux_part_reader_get_start_offset (GstSplitMuxPartReader * reader)
1271 {
1272   GstClockTime ret = GST_CLOCK_TIME_NONE;
1273 
1274   SPLITMUX_PART_LOCK (reader);
1275   ret = reader->start_offset;
1276   SPLITMUX_PART_UNLOCK (reader);
1277 
1278   return ret;
1279 }
1280 
1281 GstClockTime
gst_splitmux_part_reader_get_duration(GstSplitMuxPartReader * reader)1282 gst_splitmux_part_reader_get_duration (GstSplitMuxPartReader * reader)
1283 {
1284   GstClockTime dur;
1285 
1286   SPLITMUX_PART_LOCK (reader);
1287   dur = reader->duration;
1288   SPLITMUX_PART_UNLOCK (reader);
1289 
1290   return dur;
1291 }
1292 
1293 GstPad *
gst_splitmux_part_reader_lookup_pad(GstSplitMuxPartReader * reader,GstPad * target)1294 gst_splitmux_part_reader_lookup_pad (GstSplitMuxPartReader * reader,
1295     GstPad * target)
1296 {
1297   GstPad *result = NULL;
1298   GList *cur;
1299 
1300   SPLITMUX_PART_LOCK (reader);
1301   for (cur = g_list_first (reader->pads); cur != NULL; cur = g_list_next (cur)) {
1302     GstSplitMuxPartPad *part_pad = SPLITMUX_PART_PAD_CAST (cur->data);
1303     if (part_pad->target == target) {
1304       result = (GstPad *) gst_object_ref (part_pad);
1305       break;
1306     }
1307   }
1308   SPLITMUX_PART_UNLOCK (reader);
1309 
1310   return result;
1311 }
1312 
1313 GstFlowReturn
gst_splitmux_part_reader_pop(GstSplitMuxPartReader * reader,GstPad * pad,GstDataQueueItem ** item)1314 gst_splitmux_part_reader_pop (GstSplitMuxPartReader * reader, GstPad * pad,
1315     GstDataQueueItem ** item)
1316 {
1317   GstSplitMuxPartPad *part_pad = (GstSplitMuxPartPad *) (pad);
1318   GstDataQueue *q;
1319   GstFlowReturn ret;
1320 
1321   /* Get one item from the appropriate dataqueue */
1322   SPLITMUX_PART_LOCK (reader);
1323   if (reader->prep_state == PART_STATE_FAILED) {
1324     SPLITMUX_PART_UNLOCK (reader);
1325     return GST_FLOW_ERROR;
1326   }
1327 
1328   q = gst_object_ref (part_pad->queue);
1329 
1330   /* Have to drop the lock around pop, so we can be woken up for flush */
1331   SPLITMUX_PART_UNLOCK (reader);
1332   if (!gst_data_queue_pop (q, item) || (*item == NULL)) {
1333     ret = GST_FLOW_FLUSHING;
1334     goto out;
1335   }
1336 
1337   SPLITMUX_PART_LOCK (reader);
1338 
1339   SPLITMUX_PART_BROADCAST (reader);
1340   if (GST_IS_EVENT ((*item)->object)) {
1341     GstEvent *e = (GstEvent *) ((*item)->object);
1342     /* Mark this pad as EOS */
1343     if (GST_EVENT_TYPE (e) == GST_EVENT_EOS)
1344       part_pad->is_eos = TRUE;
1345   }
1346 
1347   SPLITMUX_PART_UNLOCK (reader);
1348 
1349   ret = GST_FLOW_OK;
1350 out:
1351   gst_object_unref (q);
1352   return ret;
1353 }
1354 
1355 static void
bus_handler(GstBin * bin,GstMessage * message)1356 bus_handler (GstBin * bin, GstMessage * message)
1357 {
1358   GstSplitMuxPartReader *reader = (GstSplitMuxPartReader *) bin;
1359 
1360   switch (GST_MESSAGE_TYPE (message)) {
1361     case GST_MESSAGE_ERROR:
1362       /* Make sure to set the state to failed and wake up the listener
1363        * on error */
1364       SPLITMUX_PART_LOCK (reader);
1365       GST_ERROR_OBJECT (reader, "Got error message from child %" GST_PTR_FORMAT
1366           " marking this reader as failed", GST_MESSAGE_SRC (message));
1367       reader->prep_state = PART_STATE_FAILED;
1368       SPLITMUX_PART_BROADCAST (reader);
1369       SPLITMUX_PART_UNLOCK (reader);
1370       do_async_done (reader);
1371       break;
1372     default:
1373       break;
1374   }
1375 
1376   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1377 }
1378