• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
4  * Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 
26 #include "gstgiobasesrc.h"
27 
28 #include <gst/base/gsttypefindhelper.h>
29 
30 GST_DEBUG_CATEGORY_STATIC (gst_gio_base_src_debug);
31 #define GST_CAT_DEFAULT gst_gio_base_src_debug
32 
33 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
34     GST_PAD_SRC,
35     GST_PAD_ALWAYS,
36     GST_STATIC_CAPS_ANY);
37 
38 #define gst_gio_base_src_parent_class parent_class
39 G_DEFINE_TYPE (GstGioBaseSrc, gst_gio_base_src, GST_TYPE_BASE_SRC);
40 
41 static void gst_gio_base_src_finalize (GObject * object);
42 
43 static gboolean gst_gio_base_src_start (GstBaseSrc * base_src);
44 static gboolean gst_gio_base_src_stop (GstBaseSrc * base_src);
45 static gboolean gst_gio_base_src_get_size (GstBaseSrc * base_src,
46     guint64 * size);
47 static gboolean gst_gio_base_src_is_seekable (GstBaseSrc * base_src);
48 static gboolean gst_gio_base_src_unlock (GstBaseSrc * base_src);
49 static gboolean gst_gio_base_src_unlock_stop (GstBaseSrc * base_src);
50 static GstFlowReturn gst_gio_base_src_create (GstBaseSrc * base_src,
51     guint64 offset, guint size, GstBuffer ** buf);
52 static gboolean gst_gio_base_src_query (GstBaseSrc * base_src,
53     GstQuery * query);
54 
55 static void
gst_gio_base_src_class_init(GstGioBaseSrcClass * klass)56 gst_gio_base_src_class_init (GstGioBaseSrcClass * klass)
57 {
58   GObjectClass *gobject_class = (GObjectClass *) klass;
59   GstElementClass *gstelement_class = (GstElementClass *) klass;
60   GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass;
61 
62   GST_DEBUG_CATEGORY_INIT (gst_gio_base_src_debug, "gio_base_src", 0,
63       "GIO base source");
64 
65   gobject_class->finalize = gst_gio_base_src_finalize;
66 
67   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
68 
69   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_gio_base_src_start);
70   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_gio_base_src_stop);
71   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_gio_base_src_get_size);
72   gstbasesrc_class->is_seekable =
73       GST_DEBUG_FUNCPTR (gst_gio_base_src_is_seekable);
74   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock);
75   gstbasesrc_class->unlock_stop =
76       GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock_stop);
77   gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_gio_base_src_create);
78   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_base_src_query);
79 }
80 
81 static void
gst_gio_base_src_init(GstGioBaseSrc * src)82 gst_gio_base_src_init (GstGioBaseSrc * src)
83 {
84   src->cancel = g_cancellable_new ();
85 }
86 
87 static void
gst_gio_base_src_finalize(GObject * object)88 gst_gio_base_src_finalize (GObject * object)
89 {
90   GstGioBaseSrc *src = GST_GIO_BASE_SRC (object);
91 
92   if (src->cancel) {
93     g_object_unref (src->cancel);
94     src->cancel = NULL;
95   }
96 
97   if (src->stream) {
98     g_object_unref (src->stream);
99     src->stream = NULL;
100   }
101 
102   if (src->cache) {
103     gst_buffer_unref (src->cache);
104     src->cache = NULL;
105   }
106 
107   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
108 }
109 
110 static gboolean
gst_gio_base_src_start(GstBaseSrc * base_src)111 gst_gio_base_src_start (GstBaseSrc * base_src)
112 {
113   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
114   GstGioBaseSrcClass *gbsrc_class = GST_GIO_BASE_SRC_GET_CLASS (src);
115 
116   src->position = 0;
117 
118   /* FIXME: This will likely block */
119   src->stream = gbsrc_class->get_stream (src);
120   if (G_UNLIKELY (!G_IS_INPUT_STREAM (src->stream))) {
121     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
122         ("No input stream provided by subclass"));
123     return FALSE;
124   } else if (G_UNLIKELY (g_input_stream_is_closed (src->stream))) {
125     GST_ELEMENT_ERROR (src, LIBRARY, FAILED, (NULL),
126         ("Input stream is already closed"));
127     return FALSE;
128   }
129 
130   if (G_IS_SEEKABLE (src->stream))
131     src->position = g_seekable_tell (G_SEEKABLE (src->stream));
132 
133   GST_DEBUG_OBJECT (src, "started source");
134 
135   return TRUE;
136 }
137 
138 static gboolean
gst_gio_base_src_stop(GstBaseSrc * base_src)139 gst_gio_base_src_stop (GstBaseSrc * base_src)
140 {
141   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
142   GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
143   gboolean success;
144   GError *err = NULL;
145 
146   if (klass->close_on_stop && G_IS_INPUT_STREAM (src->stream)) {
147     GST_DEBUG_OBJECT (src, "closing stream");
148 
149     /* FIXME: can block but unfortunately we can't use async operations
150      * here because they require a running main loop */
151     success = g_input_stream_close (src->stream, src->cancel, &err);
152 
153     if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) {
154       GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
155           ("g_input_stream_close failed: %s", err->message));
156       g_clear_error (&err);
157     } else if (!success) {
158       GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
159           ("g_input_stream_close failed"));
160     } else {
161       GST_DEBUG_OBJECT (src, "g_input_stream_close succeeded");
162     }
163 
164     g_object_unref (src->stream);
165     src->stream = NULL;
166   } else {
167     g_object_unref (src->stream);
168     src->stream = NULL;
169   }
170 
171   return TRUE;
172 }
173 
174 static gboolean
gst_gio_base_src_get_size(GstBaseSrc * base_src,guint64 * size)175 gst_gio_base_src_get_size (GstBaseSrc * base_src, guint64 * size)
176 {
177   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
178 
179   if (G_IS_FILE_INPUT_STREAM (src->stream)) {
180     GFileInfo *info;
181     GError *err = NULL;
182 
183     info = g_file_input_stream_query_info (G_FILE_INPUT_STREAM (src->stream),
184         G_FILE_ATTRIBUTE_STANDARD_SIZE, src->cancel, &err);
185 
186     if (info != NULL) {
187       *size = g_file_info_get_size (info);
188       g_object_unref (info);
189       GST_DEBUG_OBJECT (src, "found size: %" G_GUINT64_FORMAT, *size);
190       return TRUE;
191     }
192 
193     if (!gst_gio_error (src, "g_file_input_stream_query_info", &err, NULL)) {
194 
195       if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
196         GST_DEBUG_OBJECT (src, "size information not available");
197       else
198         GST_WARNING_OBJECT (src, "size information retrieval failed: %s",
199             err->message);
200 
201       g_clear_error (&err);
202     }
203   }
204 
205   if (GST_GIO_STREAM_IS_SEEKABLE (src->stream)) {
206     goffset old;
207     goffset stream_size;
208     gboolean ret;
209     GSeekable *seekable = G_SEEKABLE (src->stream);
210     GError *err = NULL;
211 
212     old = g_seekable_tell (seekable);
213 
214     ret = g_seekable_seek (seekable, 0, G_SEEK_END, src->cancel, &err);
215     if (!ret) {
216       if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
217         if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
218           GST_DEBUG_OBJECT (src,
219               "Seeking to the end of stream is not supported");
220         else
221           GST_WARNING_OBJECT (src, "Seeking to end of stream failed: %s",
222               err->message);
223         g_clear_error (&err);
224       } else {
225         GST_WARNING_OBJECT (src, "Seeking to end of stream failed");
226       }
227       return FALSE;
228     }
229 
230     stream_size = g_seekable_tell (seekable);
231 
232     ret = g_seekable_seek (seekable, old, G_SEEK_SET, src->cancel, &err);
233     if (!ret) {
234       if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
235         if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
236           GST_ERROR_OBJECT (src, "Seeking to the old position not supported");
237         else
238           GST_ERROR_OBJECT (src, "Seeking to the old position failed: %s",
239               err->message);
240         g_clear_error (&err);
241       } else {
242         GST_ERROR_OBJECT (src, "Seeking to the old position faile");
243       }
244       return FALSE;
245     }
246 
247     if (stream_size >= 0) {
248       *size = stream_size;
249       return TRUE;
250     }
251   }
252 
253   return FALSE;
254 }
255 
256 static gboolean
gst_gio_base_src_is_seekable(GstBaseSrc * base_src)257 gst_gio_base_src_is_seekable (GstBaseSrc * base_src)
258 {
259   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
260   gboolean seekable;
261 
262   seekable = GST_GIO_STREAM_IS_SEEKABLE (src->stream);
263 
264   GST_DEBUG_OBJECT (src, "can seek: %d", seekable);
265 
266   return seekable;
267 }
268 
269 static gboolean
gst_gio_base_src_unlock(GstBaseSrc * base_src)270 gst_gio_base_src_unlock (GstBaseSrc * base_src)
271 {
272   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
273 
274   GST_LOG_OBJECT (src, "triggering cancellation");
275 
276   g_cancellable_cancel (src->cancel);
277 
278   return TRUE;
279 }
280 
281 static gboolean
gst_gio_base_src_unlock_stop(GstBaseSrc * base_src)282 gst_gio_base_src_unlock_stop (GstBaseSrc * base_src)
283 {
284   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
285 
286   GST_LOG_OBJECT (src, "resetting cancellable");
287 
288   g_object_unref (src->cancel);
289   src->cancel = g_cancellable_new ();
290 
291   return TRUE;
292 }
293 
294 static GstFlowReturn
gst_gio_base_src_create(GstBaseSrc * base_src,guint64 offset,guint size,GstBuffer ** buf_return)295 gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
296     GstBuffer ** buf_return)
297 {
298   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
299   GstBuffer *buf;
300   GstFlowReturn ret = GST_FLOW_OK;
301 
302   g_return_val_if_fail (G_IS_INPUT_STREAM (src->stream), GST_FLOW_ERROR);
303 
304   /* If we have the requested part in our cache take a subbuffer of that,
305    * otherwise fill the cache again with at least 4096 bytes from the
306    * requested offset and return a subbuffer of that.
307    *
308    * We need caching because every read/seek operation will need to go
309    * over DBus if our backend is GVfs and this is painfully slow. */
310   if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
311       offset + size <= GST_BUFFER_OFFSET_END (src->cache)) {
312     GST_DEBUG_OBJECT (src, "Creating subbuffer from cached buffer: offset %"
313         G_GUINT64_FORMAT " length %u", offset, size);
314 
315     buf = gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL,
316         offset - GST_BUFFER_OFFSET (src->cache), size);
317 
318     GST_BUFFER_OFFSET (buf) = offset;
319     GST_BUFFER_OFFSET_END (buf) = offset + size;
320   } else {
321     guint cachesize = MAX (4096, size);
322     GstMapInfo map;
323     gssize read, streamread, res;
324     guint64 readoffset;
325     gboolean success, eos;
326     GError *err = NULL;
327     GstBuffer *newbuffer;
328     GstMemory *mem;
329 
330     newbuffer = gst_buffer_new ();
331 
332     /* copy any overlapping data from the cached buffer */
333     if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
334         offset <= GST_BUFFER_OFFSET_END (src->cache)) {
335       read = GST_BUFFER_OFFSET_END (src->cache) - offset;
336       GST_LOG_OBJECT (src,
337           "Copying %" G_GSSIZE_FORMAT " bytes from cached buffer at %"
338           G_GUINT64_FORMAT, read, offset - GST_BUFFER_OFFSET (src->cache));
339       gst_buffer_copy_into (newbuffer, src->cache, GST_BUFFER_COPY_MEMORY,
340           offset - GST_BUFFER_OFFSET (src->cache), read);
341     } else {
342       read = 0;
343     }
344 
345     if (src->cache)
346       gst_buffer_unref (src->cache);
347     src->cache = newbuffer;
348 
349     readoffset = offset + read;
350     GST_LOG_OBJECT (src,
351         "Reading %u bytes from offset %" G_GUINT64_FORMAT, cachesize,
352         readoffset);
353 
354     if (G_UNLIKELY (readoffset != src->position)) {
355       if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream))
356         return GST_FLOW_NOT_SUPPORTED;
357 
358       GST_DEBUG_OBJECT (src, "Seeking to position %" G_GUINT64_FORMAT,
359           readoffset);
360       ret =
361           gst_gio_seek (src, G_SEEKABLE (src->stream), readoffset, src->cancel);
362 
363       if (ret == GST_FLOW_OK)
364         src->position = readoffset;
365       else
366         return ret;
367     }
368 
369     /* GIO sometimes gives less bytes than requested although
370      * it's not at the end of file. SMB for example only
371      * supports reads up to 64k. So we loop here until we get at
372      * at least the requested amount of bytes or a read returns
373      * nothing. */
374     mem = gst_allocator_alloc (NULL, cachesize, NULL);
375     if (mem == NULL) {
376       GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", cachesize);
377       return GST_FLOW_ERROR;
378     }
379 
380     gst_memory_map (mem, &map, GST_MAP_WRITE);
381     streamread = 0;
382     while (size - read > 0 && (res =
383             g_input_stream_read (G_INPUT_STREAM (src->stream),
384                 map.data + streamread, cachesize - streamread, src->cancel,
385                 &err)) > 0) {
386       read += res;
387       streamread += res;
388       src->position += res;
389     }
390     gst_memory_unmap (mem, &map);
391     gst_buffer_append_memory (src->cache, mem);
392 
393     success = (read >= 0);
394     eos = (cachesize > 0 && read == 0);
395 
396     if (!success && !gst_gio_error (src, "g_input_stream_read", &err, &ret)) {
397       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
398           ("Could not read from stream: %s", err->message));
399       g_clear_error (&err);
400     }
401 
402     if (success && !eos) {
403       GST_BUFFER_OFFSET (src->cache) = offset;
404       GST_BUFFER_OFFSET_END (src->cache) = offset + read;
405 
406       GST_DEBUG_OBJECT (src, "Read successful");
407       GST_DEBUG_OBJECT (src, "Creating subbuffer from new "
408           "cached buffer: offset %" G_GUINT64_FORMAT " length %u", offset,
409           size);
410 
411       buf =
412           gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL, 0, MIN (size,
413               read));
414 
415       GST_BUFFER_OFFSET (buf) = offset;
416       GST_BUFFER_OFFSET_END (buf) = offset + MIN (size, read);
417     } else {
418       GST_DEBUG_OBJECT (src, "Read not successful");
419       gst_buffer_unref (src->cache);
420       src->cache = NULL;
421       buf = NULL;
422     }
423 
424     if (eos)
425       ret = GST_FLOW_EOS;
426   }
427 
428   *buf_return = buf;
429 
430   return ret;
431 }
432 
433 static gboolean
gst_gio_base_src_query(GstBaseSrc * base_src,GstQuery * query)434 gst_gio_base_src_query (GstBaseSrc * base_src, GstQuery * query)
435 {
436   gboolean ret = FALSE;
437   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
438 
439   switch (GST_QUERY_TYPE (query)) {
440     case GST_QUERY_URI:
441       if (GST_IS_URI_HANDLER (src)) {
442         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
443         gst_query_set_uri (query, uri);
444         g_free (uri);
445         ret = TRUE;
446       }
447       break;
448     default:
449       ret = FALSE;
450       break;
451   }
452 
453   if (!ret)
454     ret = GST_BASE_SRC_CLASS (parent_class)->query (base_src, query);
455 
456   return ret;
457 }
458