• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
4  * Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 
26 #include "gstgiobasesrc.h"
27 #include "gstgioelements.h"
28 
29 GST_DEBUG_CATEGORY_STATIC (gst_gio_base_src_debug);
30 #define GST_CAT_DEFAULT gst_gio_base_src_debug
31 
32 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
33     GST_PAD_SRC,
34     GST_PAD_ALWAYS,
35     GST_STATIC_CAPS_ANY);
36 
37 #define gst_gio_base_src_parent_class parent_class
38 G_DEFINE_TYPE (GstGioBaseSrc, gst_gio_base_src, GST_TYPE_BASE_SRC);
39 
40 static void gst_gio_base_src_finalize (GObject * object);
41 
42 static gboolean gst_gio_base_src_start (GstBaseSrc * base_src);
43 static gboolean gst_gio_base_src_stop (GstBaseSrc * base_src);
44 static gboolean gst_gio_base_src_get_size (GstBaseSrc * base_src,
45     guint64 * size);
46 static gboolean gst_gio_base_src_is_seekable (GstBaseSrc * base_src);
47 static gboolean gst_gio_base_src_unlock (GstBaseSrc * base_src);
48 static gboolean gst_gio_base_src_unlock_stop (GstBaseSrc * base_src);
49 static GstFlowReturn gst_gio_base_src_create (GstBaseSrc * base_src,
50     guint64 offset, guint size, GstBuffer ** buf);
51 static gboolean gst_gio_base_src_query (GstBaseSrc * base_src,
52     GstQuery * query);
53 
54 static void
gst_gio_base_src_class_init(GstGioBaseSrcClass * klass)55 gst_gio_base_src_class_init (GstGioBaseSrcClass * klass)
56 {
57   GObjectClass *gobject_class = (GObjectClass *) klass;
58   GstElementClass *gstelement_class = (GstElementClass *) klass;
59   GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass;
60 
61   GST_DEBUG_CATEGORY_INIT (gst_gio_base_src_debug, "gio_base_src", 0,
62       "GIO base source");
63 
64   gobject_class->finalize = gst_gio_base_src_finalize;
65 
66   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
67 
68   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_gio_base_src_start);
69   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_gio_base_src_stop);
70   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_gio_base_src_get_size);
71   gstbasesrc_class->is_seekable =
72       GST_DEBUG_FUNCPTR (gst_gio_base_src_is_seekable);
73   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock);
74   gstbasesrc_class->unlock_stop =
75       GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock_stop);
76   gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_gio_base_src_create);
77   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_base_src_query);
78 
79   gst_type_mark_as_plugin_api (GST_TYPE_GIO_BASE_SRC, 0);
80 }
81 
82 static void
gst_gio_base_src_init(GstGioBaseSrc * src)83 gst_gio_base_src_init (GstGioBaseSrc * src)
84 {
85   src->cancel = g_cancellable_new ();
86 }
87 
88 static void
gst_gio_base_src_finalize(GObject * object)89 gst_gio_base_src_finalize (GObject * object)
90 {
91   GstGioBaseSrc *src = GST_GIO_BASE_SRC (object);
92 
93   if (src->cancel) {
94     g_object_unref (src->cancel);
95     src->cancel = NULL;
96   }
97 
98   if (src->stream) {
99     g_object_unref (src->stream);
100     src->stream = NULL;
101   }
102 
103   if (src->cache) {
104     gst_buffer_unref (src->cache);
105     src->cache = NULL;
106   }
107 
108   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
109 }
110 
111 static gboolean
gst_gio_base_src_start(GstBaseSrc * base_src)112 gst_gio_base_src_start (GstBaseSrc * base_src)
113 {
114   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
115   GstGioBaseSrcClass *gbsrc_class = GST_GIO_BASE_SRC_GET_CLASS (src);
116 
117   src->position = 0;
118 
119   /* FIXME: This will likely block */
120   src->stream = gbsrc_class->get_stream (src);
121   if (G_UNLIKELY (!G_IS_INPUT_STREAM (src->stream))) {
122     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
123         ("No input stream provided by subclass"));
124     return FALSE;
125   } else if (G_UNLIKELY (g_input_stream_is_closed (src->stream))) {
126     GST_ELEMENT_ERROR (src, LIBRARY, FAILED, (NULL),
127         ("Input stream is already closed"));
128     return FALSE;
129   }
130 
131   if (G_IS_SEEKABLE (src->stream))
132     src->position = g_seekable_tell (G_SEEKABLE (src->stream));
133 
134   GST_DEBUG_OBJECT (src, "started source");
135 
136   return TRUE;
137 }
138 
139 static gboolean
gst_gio_base_src_stop(GstBaseSrc * base_src)140 gst_gio_base_src_stop (GstBaseSrc * base_src)
141 {
142   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
143   GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
144   gboolean success;
145   GError *err = NULL;
146 
147   if (klass->close_on_stop && G_IS_INPUT_STREAM (src->stream)) {
148     GST_DEBUG_OBJECT (src, "closing stream");
149 
150     /* FIXME: can block but unfortunately we can't use async operations
151      * here because they require a running main loop */
152     success = g_input_stream_close (src->stream, src->cancel, &err);
153 
154     if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) {
155       GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
156           ("g_input_stream_close failed: %s", err->message));
157       g_clear_error (&err);
158     } else if (!success) {
159       GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
160           ("g_input_stream_close failed"));
161     } else {
162       GST_DEBUG_OBJECT (src, "g_input_stream_close succeeded");
163     }
164 
165     g_object_unref (src->stream);
166     src->stream = NULL;
167   } else {
168     g_object_unref (src->stream);
169     src->stream = NULL;
170   }
171 
172   return TRUE;
173 }
174 
175 static gboolean
gst_gio_base_src_get_size(GstBaseSrc * base_src,guint64 * size)176 gst_gio_base_src_get_size (GstBaseSrc * base_src, guint64 * size)
177 {
178   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
179 
180   if (G_IS_FILE_INPUT_STREAM (src->stream)) {
181     GFileInfo *info;
182     GError *err = NULL;
183 
184     info = g_file_input_stream_query_info (G_FILE_INPUT_STREAM (src->stream),
185         G_FILE_ATTRIBUTE_STANDARD_SIZE, src->cancel, &err);
186 
187     if (info != NULL) {
188       *size = g_file_info_get_size (info);
189       g_object_unref (info);
190       GST_DEBUG_OBJECT (src, "found size: %" G_GUINT64_FORMAT, *size);
191       return TRUE;
192     }
193 
194     if (!gst_gio_error (src, "g_file_input_stream_query_info", &err, NULL)) {
195 
196       if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
197         GST_DEBUG_OBJECT (src, "size information not available");
198       else
199         GST_WARNING_OBJECT (src, "size information retrieval failed: %s",
200             err->message);
201 
202       g_clear_error (&err);
203     }
204   }
205 
206   if (GST_GIO_STREAM_IS_SEEKABLE (src->stream)) {
207     goffset old;
208     goffset stream_size;
209     gboolean ret;
210     GSeekable *seekable = G_SEEKABLE (src->stream);
211     GError *err = NULL;
212 
213     old = g_seekable_tell (seekable);
214 
215     ret = g_seekable_seek (seekable, 0, G_SEEK_END, src->cancel, &err);
216     if (!ret) {
217       if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
218         if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
219           GST_DEBUG_OBJECT (src,
220               "Seeking to the end of stream is not supported");
221         else
222           GST_WARNING_OBJECT (src, "Seeking to end of stream failed: %s",
223               err->message);
224         g_clear_error (&err);
225       } else {
226         GST_WARNING_OBJECT (src, "Seeking to end of stream failed");
227       }
228       return FALSE;
229     }
230 
231     stream_size = g_seekable_tell (seekable);
232 
233     ret = g_seekable_seek (seekable, old, G_SEEK_SET, src->cancel, &err);
234     if (!ret) {
235       if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
236         if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
237           GST_ERROR_OBJECT (src, "Seeking to the old position not supported");
238         else
239           GST_ERROR_OBJECT (src, "Seeking to the old position failed: %s",
240               err->message);
241         g_clear_error (&err);
242       } else {
243         GST_ERROR_OBJECT (src, "Seeking to the old position failed");
244       }
245       return FALSE;
246     }
247 
248     if (stream_size >= 0) {
249       *size = stream_size;
250       return TRUE;
251     }
252   }
253 
254   return FALSE;
255 }
256 
257 static gboolean
gst_gio_base_src_is_seekable(GstBaseSrc * base_src)258 gst_gio_base_src_is_seekable (GstBaseSrc * base_src)
259 {
260   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
261   gboolean seekable;
262 
263   seekable = GST_GIO_STREAM_IS_SEEKABLE (src->stream);
264 
265   GST_DEBUG_OBJECT (src, "can seek: %d", seekable);
266 
267   return seekable;
268 }
269 
270 static gboolean
gst_gio_base_src_unlock(GstBaseSrc * base_src)271 gst_gio_base_src_unlock (GstBaseSrc * base_src)
272 {
273   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
274 
275   GST_LOG_OBJECT (src, "triggering cancellation");
276 
277   g_cancellable_cancel (src->cancel);
278 
279   return TRUE;
280 }
281 
282 static gboolean
gst_gio_base_src_unlock_stop(GstBaseSrc * base_src)283 gst_gio_base_src_unlock_stop (GstBaseSrc * base_src)
284 {
285   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
286 
287   GST_LOG_OBJECT (src, "resetting cancellable");
288 
289   g_object_unref (src->cancel);
290   src->cancel = g_cancellable_new ();
291 
292   return TRUE;
293 }
294 
295 static GstFlowReturn
gst_gio_base_src_create(GstBaseSrc * base_src,guint64 offset,guint size,GstBuffer ** buf_return)296 gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
297     GstBuffer ** buf_return)
298 {
299   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
300   GstBuffer *buf;
301   GstFlowReturn ret = GST_FLOW_OK;
302 
303   g_return_val_if_fail (G_IS_INPUT_STREAM (src->stream), GST_FLOW_ERROR);
304 
305   /* If we have the requested part in our cache take a subbuffer of that,
306    * otherwise fill the cache again with at least 4096 bytes from the
307    * requested offset and return a subbuffer of that.
308    *
309    * We need caching because every read/seek operation will need to go
310    * over DBus if our backend is GVfs and this is painfully slow. */
311   if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
312       offset + size <= GST_BUFFER_OFFSET_END (src->cache)) {
313     GST_DEBUG_OBJECT (src, "Creating subbuffer from cached buffer: offset %"
314         G_GUINT64_FORMAT " length %u", offset, size);
315 
316     buf = gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL,
317         offset - GST_BUFFER_OFFSET (src->cache), size);
318 
319     GST_BUFFER_OFFSET (buf) = offset;
320     GST_BUFFER_OFFSET_END (buf) = offset + size;
321   } else {
322     guint cachesize = MAX (4096, size);
323     GstMapInfo map;
324     gssize read, streamread, res;
325     guint64 readoffset;
326     gboolean success, eos;
327     GError *err = NULL;
328     GstBuffer *newbuffer;
329     GstMemory *mem;
330     gboolean waited_for_data = FALSE;
331     GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
332 
333     newbuffer = gst_buffer_new ();
334 
335     /* copy any overlapping data from the cached buffer */
336     if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
337         offset <= GST_BUFFER_OFFSET_END (src->cache)) {
338       read = GST_BUFFER_OFFSET_END (src->cache) - offset;
339       GST_LOG_OBJECT (src,
340           "Copying %" G_GSSIZE_FORMAT " bytes from cached buffer at %"
341           G_GUINT64_FORMAT, read, offset - GST_BUFFER_OFFSET (src->cache));
342       gst_buffer_copy_into (newbuffer, src->cache, GST_BUFFER_COPY_MEMORY,
343           offset - GST_BUFFER_OFFSET (src->cache), read);
344     } else {
345       read = 0;
346     }
347 
348     if (src->cache)
349       gst_buffer_unref (src->cache);
350     src->cache = newbuffer;
351 
352     readoffset = offset + read;
353     GST_LOG_OBJECT (src,
354         "Reading %u bytes from offset %" G_GUINT64_FORMAT, cachesize,
355         readoffset);
356 
357     if (G_UNLIKELY (readoffset != src->position)) {
358       if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream))
359         return GST_FLOW_NOT_SUPPORTED;
360 
361       GST_DEBUG_OBJECT (src, "Seeking to position %" G_GUINT64_FORMAT,
362           readoffset);
363       ret =
364           gst_gio_seek (src, G_SEEKABLE (src->stream), readoffset, src->cancel);
365 
366       if (ret == GST_FLOW_OK)
367         src->position = readoffset;
368       else
369         return ret;
370     }
371 
372     /* GIO sometimes gives less bytes than requested although
373      * it's not at the end of file. SMB for example only
374      * supports reads up to 64k. So we loop here until we get at
375      * at least the requested amount of bytes or a read returns
376      * nothing. */
377     mem = gst_allocator_alloc (NULL, cachesize, NULL);
378     if (mem == NULL) {
379       GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", cachesize);
380       return GST_FLOW_ERROR;
381     }
382 
383     gst_memory_map (mem, &map, GST_MAP_WRITE);
384     streamread = 0;
385     while (size - read > 0 && (res =
386             g_input_stream_read (G_INPUT_STREAM (src->stream),
387                 map.data + streamread, cachesize - streamread, src->cancel,
388                 &err)) >= 0) {
389 
390       read += res;
391       streamread += res;
392       src->position += res;
393 
394       if (res != 0)
395         continue;
396 
397       if (!klass->wait_for_data || !klass->wait_for_data (src))
398         break;
399 
400       waited_for_data = TRUE;
401     }
402 
403     if (waited_for_data && klass->waited_for_data)
404       klass->waited_for_data (src);
405 
406     gst_memory_unmap (mem, &map);
407     gst_buffer_append_memory (src->cache, mem);
408 
409     success = (read >= 0);
410     eos = (cachesize > 0 && read == 0);
411 
412     if (!success && !gst_gio_error (src, "g_input_stream_read", &err, &ret)) {
413       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
414           ("Could not read from stream: %s", err->message));
415       g_clear_error (&err);
416     }
417 
418     if (success && !eos) {
419       GST_BUFFER_OFFSET (src->cache) = offset;
420       GST_BUFFER_OFFSET_END (src->cache) = offset + read;
421 
422       GST_DEBUG_OBJECT (src, "Read successful");
423       GST_DEBUG_OBJECT (src, "Creating subbuffer from new "
424           "cached buffer: offset %" G_GUINT64_FORMAT " length %u", offset,
425           size);
426 
427       buf =
428           gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL, 0, MIN (size,
429               read));
430 
431       GST_BUFFER_OFFSET (buf) = offset;
432       GST_BUFFER_OFFSET_END (buf) = offset + MIN (size, read);
433     } else {
434       GST_DEBUG_OBJECT (src, "Read not successful");
435       gst_buffer_unref (src->cache);
436       src->cache = NULL;
437       buf = NULL;
438     }
439 
440     if (eos)
441       ret = GST_FLOW_EOS;
442   }
443 
444   *buf_return = buf;
445 
446   return ret;
447 }
448 
449 static gboolean
gst_gio_base_src_query(GstBaseSrc * base_src,GstQuery * query)450 gst_gio_base_src_query (GstBaseSrc * base_src, GstQuery * query)
451 {
452   gboolean ret = FALSE;
453   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
454 
455   switch (GST_QUERY_TYPE (query)) {
456     case GST_QUERY_URI:
457       if (GST_IS_URI_HANDLER (src)) {
458         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
459         gst_query_set_uri (query, uri);
460         g_free (uri);
461         ret = TRUE;
462       }
463       break;
464     default:
465       ret = FALSE;
466       break;
467   }
468 
469   if (!ret)
470     ret = GST_BASE_SRC_CLASS (parent_class)->query (base_src, query);
471 
472   return ret;
473 }
474