1 /* GStreamer
2 *
3 * Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
4 * Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include "gstgiobasesrc.h"
27
28 #include <gst/base/gsttypefindhelper.h>
29
30 GST_DEBUG_CATEGORY_STATIC (gst_gio_base_src_debug);
31 #define GST_CAT_DEFAULT gst_gio_base_src_debug
32
33 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
34 GST_PAD_SRC,
35 GST_PAD_ALWAYS,
36 GST_STATIC_CAPS_ANY);
37
38 #define gst_gio_base_src_parent_class parent_class
39 G_DEFINE_TYPE (GstGioBaseSrc, gst_gio_base_src, GST_TYPE_BASE_SRC);
40
41 static void gst_gio_base_src_finalize (GObject * object);
42
43 static gboolean gst_gio_base_src_start (GstBaseSrc * base_src);
44 static gboolean gst_gio_base_src_stop (GstBaseSrc * base_src);
45 static gboolean gst_gio_base_src_get_size (GstBaseSrc * base_src,
46 guint64 * size);
47 static gboolean gst_gio_base_src_is_seekable (GstBaseSrc * base_src);
48 static gboolean gst_gio_base_src_unlock (GstBaseSrc * base_src);
49 static gboolean gst_gio_base_src_unlock_stop (GstBaseSrc * base_src);
50 static GstFlowReturn gst_gio_base_src_create (GstBaseSrc * base_src,
51 guint64 offset, guint size, GstBuffer ** buf);
52 static gboolean gst_gio_base_src_query (GstBaseSrc * base_src,
53 GstQuery * query);
54
55 static void
gst_gio_base_src_class_init(GstGioBaseSrcClass * klass)56 gst_gio_base_src_class_init (GstGioBaseSrcClass * klass)
57 {
58 GObjectClass *gobject_class = (GObjectClass *) klass;
59 GstElementClass *gstelement_class = (GstElementClass *) klass;
60 GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass;
61
62 GST_DEBUG_CATEGORY_INIT (gst_gio_base_src_debug, "gio_base_src", 0,
63 "GIO base source");
64
65 gobject_class->finalize = gst_gio_base_src_finalize;
66
67 gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
68
69 gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_gio_base_src_start);
70 gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_gio_base_src_stop);
71 gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_gio_base_src_get_size);
72 gstbasesrc_class->is_seekable =
73 GST_DEBUG_FUNCPTR (gst_gio_base_src_is_seekable);
74 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock);
75 gstbasesrc_class->unlock_stop =
76 GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock_stop);
77 gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_gio_base_src_create);
78 gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_base_src_query);
79 }
80
81 static void
gst_gio_base_src_init(GstGioBaseSrc * src)82 gst_gio_base_src_init (GstGioBaseSrc * src)
83 {
84 src->cancel = g_cancellable_new ();
85 }
86
87 static void
gst_gio_base_src_finalize(GObject * object)88 gst_gio_base_src_finalize (GObject * object)
89 {
90 GstGioBaseSrc *src = GST_GIO_BASE_SRC (object);
91
92 if (src->cancel) {
93 g_object_unref (src->cancel);
94 src->cancel = NULL;
95 }
96
97 if (src->stream) {
98 g_object_unref (src->stream);
99 src->stream = NULL;
100 }
101
102 if (src->cache) {
103 gst_buffer_unref (src->cache);
104 src->cache = NULL;
105 }
106
107 GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
108 }
109
110 static gboolean
gst_gio_base_src_start(GstBaseSrc * base_src)111 gst_gio_base_src_start (GstBaseSrc * base_src)
112 {
113 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
114 GstGioBaseSrcClass *gbsrc_class = GST_GIO_BASE_SRC_GET_CLASS (src);
115
116 src->position = 0;
117
118 /* FIXME: This will likely block */
119 src->stream = gbsrc_class->get_stream (src);
120 if (G_UNLIKELY (!G_IS_INPUT_STREAM (src->stream))) {
121 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
122 ("No input stream provided by subclass"));
123 return FALSE;
124 } else if (G_UNLIKELY (g_input_stream_is_closed (src->stream))) {
125 GST_ELEMENT_ERROR (src, LIBRARY, FAILED, (NULL),
126 ("Input stream is already closed"));
127 return FALSE;
128 }
129
130 if (G_IS_SEEKABLE (src->stream))
131 src->position = g_seekable_tell (G_SEEKABLE (src->stream));
132
133 GST_DEBUG_OBJECT (src, "started source");
134
135 return TRUE;
136 }
137
138 static gboolean
gst_gio_base_src_stop(GstBaseSrc * base_src)139 gst_gio_base_src_stop (GstBaseSrc * base_src)
140 {
141 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
142 GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
143 gboolean success;
144 GError *err = NULL;
145
146 if (klass->close_on_stop && G_IS_INPUT_STREAM (src->stream)) {
147 GST_DEBUG_OBJECT (src, "closing stream");
148
149 /* FIXME: can block but unfortunately we can't use async operations
150 * here because they require a running main loop */
151 success = g_input_stream_close (src->stream, src->cancel, &err);
152
153 if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) {
154 GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
155 ("g_input_stream_close failed: %s", err->message));
156 g_clear_error (&err);
157 } else if (!success) {
158 GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
159 ("g_input_stream_close failed"));
160 } else {
161 GST_DEBUG_OBJECT (src, "g_input_stream_close succeeded");
162 }
163
164 g_object_unref (src->stream);
165 src->stream = NULL;
166 } else {
167 g_object_unref (src->stream);
168 src->stream = NULL;
169 }
170
171 return TRUE;
172 }
173
174 static gboolean
gst_gio_base_src_get_size(GstBaseSrc * base_src,guint64 * size)175 gst_gio_base_src_get_size (GstBaseSrc * base_src, guint64 * size)
176 {
177 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
178
179 if (G_IS_FILE_INPUT_STREAM (src->stream)) {
180 GFileInfo *info;
181 GError *err = NULL;
182
183 info = g_file_input_stream_query_info (G_FILE_INPUT_STREAM (src->stream),
184 G_FILE_ATTRIBUTE_STANDARD_SIZE, src->cancel, &err);
185
186 if (info != NULL) {
187 *size = g_file_info_get_size (info);
188 g_object_unref (info);
189 GST_DEBUG_OBJECT (src, "found size: %" G_GUINT64_FORMAT, *size);
190 return TRUE;
191 }
192
193 if (!gst_gio_error (src, "g_file_input_stream_query_info", &err, NULL)) {
194
195 if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
196 GST_DEBUG_OBJECT (src, "size information not available");
197 else
198 GST_WARNING_OBJECT (src, "size information retrieval failed: %s",
199 err->message);
200
201 g_clear_error (&err);
202 }
203 }
204
205 if (GST_GIO_STREAM_IS_SEEKABLE (src->stream)) {
206 goffset old;
207 goffset stream_size;
208 gboolean ret;
209 GSeekable *seekable = G_SEEKABLE (src->stream);
210 GError *err = NULL;
211
212 old = g_seekable_tell (seekable);
213
214 ret = g_seekable_seek (seekable, 0, G_SEEK_END, src->cancel, &err);
215 if (!ret) {
216 if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
217 if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
218 GST_DEBUG_OBJECT (src,
219 "Seeking to the end of stream is not supported");
220 else
221 GST_WARNING_OBJECT (src, "Seeking to end of stream failed: %s",
222 err->message);
223 g_clear_error (&err);
224 } else {
225 GST_WARNING_OBJECT (src, "Seeking to end of stream failed");
226 }
227 return FALSE;
228 }
229
230 stream_size = g_seekable_tell (seekable);
231
232 ret = g_seekable_seek (seekable, old, G_SEEK_SET, src->cancel, &err);
233 if (!ret) {
234 if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
235 if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
236 GST_ERROR_OBJECT (src, "Seeking to the old position not supported");
237 else
238 GST_ERROR_OBJECT (src, "Seeking to the old position failed: %s",
239 err->message);
240 g_clear_error (&err);
241 } else {
242 GST_ERROR_OBJECT (src, "Seeking to the old position faile");
243 }
244 return FALSE;
245 }
246
247 if (stream_size >= 0) {
248 *size = stream_size;
249 return TRUE;
250 }
251 }
252
253 return FALSE;
254 }
255
256 static gboolean
gst_gio_base_src_is_seekable(GstBaseSrc * base_src)257 gst_gio_base_src_is_seekable (GstBaseSrc * base_src)
258 {
259 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
260 gboolean seekable;
261
262 seekable = GST_GIO_STREAM_IS_SEEKABLE (src->stream);
263
264 GST_DEBUG_OBJECT (src, "can seek: %d", seekable);
265
266 return seekable;
267 }
268
269 static gboolean
gst_gio_base_src_unlock(GstBaseSrc * base_src)270 gst_gio_base_src_unlock (GstBaseSrc * base_src)
271 {
272 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
273
274 GST_LOG_OBJECT (src, "triggering cancellation");
275
276 g_cancellable_cancel (src->cancel);
277
278 return TRUE;
279 }
280
281 static gboolean
gst_gio_base_src_unlock_stop(GstBaseSrc * base_src)282 gst_gio_base_src_unlock_stop (GstBaseSrc * base_src)
283 {
284 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
285
286 GST_LOG_OBJECT (src, "resetting cancellable");
287
288 g_object_unref (src->cancel);
289 src->cancel = g_cancellable_new ();
290
291 return TRUE;
292 }
293
294 static GstFlowReturn
gst_gio_base_src_create(GstBaseSrc * base_src,guint64 offset,guint size,GstBuffer ** buf_return)295 gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
296 GstBuffer ** buf_return)
297 {
298 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
299 GstBuffer *buf;
300 GstFlowReturn ret = GST_FLOW_OK;
301
302 g_return_val_if_fail (G_IS_INPUT_STREAM (src->stream), GST_FLOW_ERROR);
303
304 /* If we have the requested part in our cache take a subbuffer of that,
305 * otherwise fill the cache again with at least 4096 bytes from the
306 * requested offset and return a subbuffer of that.
307 *
308 * We need caching because every read/seek operation will need to go
309 * over DBus if our backend is GVfs and this is painfully slow. */
310 if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
311 offset + size <= GST_BUFFER_OFFSET_END (src->cache)) {
312 GST_DEBUG_OBJECT (src, "Creating subbuffer from cached buffer: offset %"
313 G_GUINT64_FORMAT " length %u", offset, size);
314
315 buf = gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL,
316 offset - GST_BUFFER_OFFSET (src->cache), size);
317
318 GST_BUFFER_OFFSET (buf) = offset;
319 GST_BUFFER_OFFSET_END (buf) = offset + size;
320 } else {
321 guint cachesize = MAX (4096, size);
322 GstMapInfo map;
323 gssize read, streamread, res;
324 guint64 readoffset;
325 gboolean success, eos;
326 GError *err = NULL;
327 GstBuffer *newbuffer;
328 GstMemory *mem;
329
330 newbuffer = gst_buffer_new ();
331
332 /* copy any overlapping data from the cached buffer */
333 if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
334 offset <= GST_BUFFER_OFFSET_END (src->cache)) {
335 read = GST_BUFFER_OFFSET_END (src->cache) - offset;
336 GST_LOG_OBJECT (src,
337 "Copying %" G_GSSIZE_FORMAT " bytes from cached buffer at %"
338 G_GUINT64_FORMAT, read, offset - GST_BUFFER_OFFSET (src->cache));
339 gst_buffer_copy_into (newbuffer, src->cache, GST_BUFFER_COPY_MEMORY,
340 offset - GST_BUFFER_OFFSET (src->cache), read);
341 } else {
342 read = 0;
343 }
344
345 if (src->cache)
346 gst_buffer_unref (src->cache);
347 src->cache = newbuffer;
348
349 readoffset = offset + read;
350 GST_LOG_OBJECT (src,
351 "Reading %u bytes from offset %" G_GUINT64_FORMAT, cachesize,
352 readoffset);
353
354 if (G_UNLIKELY (readoffset != src->position)) {
355 if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream))
356 return GST_FLOW_NOT_SUPPORTED;
357
358 GST_DEBUG_OBJECT (src, "Seeking to position %" G_GUINT64_FORMAT,
359 readoffset);
360 ret =
361 gst_gio_seek (src, G_SEEKABLE (src->stream), readoffset, src->cancel);
362
363 if (ret == GST_FLOW_OK)
364 src->position = readoffset;
365 else
366 return ret;
367 }
368
369 /* GIO sometimes gives less bytes than requested although
370 * it's not at the end of file. SMB for example only
371 * supports reads up to 64k. So we loop here until we get at
372 * at least the requested amount of bytes or a read returns
373 * nothing. */
374 mem = gst_allocator_alloc (NULL, cachesize, NULL);
375 if (mem == NULL) {
376 GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", cachesize);
377 return GST_FLOW_ERROR;
378 }
379
380 gst_memory_map (mem, &map, GST_MAP_WRITE);
381 streamread = 0;
382 while (size - read > 0 && (res =
383 g_input_stream_read (G_INPUT_STREAM (src->stream),
384 map.data + streamread, cachesize - streamread, src->cancel,
385 &err)) > 0) {
386 read += res;
387 streamread += res;
388 src->position += res;
389 }
390 gst_memory_unmap (mem, &map);
391 gst_buffer_append_memory (src->cache, mem);
392
393 success = (read >= 0);
394 eos = (cachesize > 0 && read == 0);
395
396 if (!success && !gst_gio_error (src, "g_input_stream_read", &err, &ret)) {
397 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
398 ("Could not read from stream: %s", err->message));
399 g_clear_error (&err);
400 }
401
402 if (success && !eos) {
403 GST_BUFFER_OFFSET (src->cache) = offset;
404 GST_BUFFER_OFFSET_END (src->cache) = offset + read;
405
406 GST_DEBUG_OBJECT (src, "Read successful");
407 GST_DEBUG_OBJECT (src, "Creating subbuffer from new "
408 "cached buffer: offset %" G_GUINT64_FORMAT " length %u", offset,
409 size);
410
411 buf =
412 gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL, 0, MIN (size,
413 read));
414
415 GST_BUFFER_OFFSET (buf) = offset;
416 GST_BUFFER_OFFSET_END (buf) = offset + MIN (size, read);
417 } else {
418 GST_DEBUG_OBJECT (src, "Read not successful");
419 gst_buffer_unref (src->cache);
420 src->cache = NULL;
421 buf = NULL;
422 }
423
424 if (eos)
425 ret = GST_FLOW_EOS;
426 }
427
428 *buf_return = buf;
429
430 return ret;
431 }
432
433 static gboolean
gst_gio_base_src_query(GstBaseSrc * base_src,GstQuery * query)434 gst_gio_base_src_query (GstBaseSrc * base_src, GstQuery * query)
435 {
436 gboolean ret = FALSE;
437 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
438
439 switch (GST_QUERY_TYPE (query)) {
440 case GST_QUERY_URI:
441 if (GST_IS_URI_HANDLER (src)) {
442 gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
443 gst_query_set_uri (query, uri);
444 g_free (uri);
445 ret = TRUE;
446 }
447 break;
448 default:
449 ret = FALSE;
450 break;
451 }
452
453 if (!ret)
454 ret = GST_BASE_SRC_CLASS (parent_class)->query (base_src, query);
455
456 return ret;
457 }
458