1 /* GStreamer
2 *
3 * Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
4 * Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include "gstgiobasesrc.h"
27 #include "gstgioelements.h"
28
29 GST_DEBUG_CATEGORY_STATIC (gst_gio_base_src_debug);
30 #define GST_CAT_DEFAULT gst_gio_base_src_debug
31
32 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
33 GST_PAD_SRC,
34 GST_PAD_ALWAYS,
35 GST_STATIC_CAPS_ANY);
36
37 #define gst_gio_base_src_parent_class parent_class
38 G_DEFINE_TYPE (GstGioBaseSrc, gst_gio_base_src, GST_TYPE_BASE_SRC);
39
40 static void gst_gio_base_src_finalize (GObject * object);
41
42 static gboolean gst_gio_base_src_start (GstBaseSrc * base_src);
43 static gboolean gst_gio_base_src_stop (GstBaseSrc * base_src);
44 static gboolean gst_gio_base_src_get_size (GstBaseSrc * base_src,
45 guint64 * size);
46 static gboolean gst_gio_base_src_is_seekable (GstBaseSrc * base_src);
47 static gboolean gst_gio_base_src_unlock (GstBaseSrc * base_src);
48 static gboolean gst_gio_base_src_unlock_stop (GstBaseSrc * base_src);
49 static GstFlowReturn gst_gio_base_src_create (GstBaseSrc * base_src,
50 guint64 offset, guint size, GstBuffer ** buf);
51 static gboolean gst_gio_base_src_query (GstBaseSrc * base_src,
52 GstQuery * query);
53
54 static void
gst_gio_base_src_class_init(GstGioBaseSrcClass * klass)55 gst_gio_base_src_class_init (GstGioBaseSrcClass * klass)
56 {
57 GObjectClass *gobject_class = (GObjectClass *) klass;
58 GstElementClass *gstelement_class = (GstElementClass *) klass;
59 GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass;
60
61 GST_DEBUG_CATEGORY_INIT (gst_gio_base_src_debug, "gio_base_src", 0,
62 "GIO base source");
63
64 gobject_class->finalize = gst_gio_base_src_finalize;
65
66 gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
67
68 gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_gio_base_src_start);
69 gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_gio_base_src_stop);
70 gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_gio_base_src_get_size);
71 gstbasesrc_class->is_seekable =
72 GST_DEBUG_FUNCPTR (gst_gio_base_src_is_seekable);
73 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock);
74 gstbasesrc_class->unlock_stop =
75 GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock_stop);
76 gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_gio_base_src_create);
77 gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_base_src_query);
78
79 gst_type_mark_as_plugin_api (GST_TYPE_GIO_BASE_SRC, 0);
80 }
81
82 static void
gst_gio_base_src_init(GstGioBaseSrc * src)83 gst_gio_base_src_init (GstGioBaseSrc * src)
84 {
85 src->cancel = g_cancellable_new ();
86 }
87
88 static void
gst_gio_base_src_finalize(GObject * object)89 gst_gio_base_src_finalize (GObject * object)
90 {
91 GstGioBaseSrc *src = GST_GIO_BASE_SRC (object);
92
93 if (src->cancel) {
94 g_object_unref (src->cancel);
95 src->cancel = NULL;
96 }
97
98 if (src->stream) {
99 g_object_unref (src->stream);
100 src->stream = NULL;
101 }
102
103 if (src->cache) {
104 gst_buffer_unref (src->cache);
105 src->cache = NULL;
106 }
107
108 GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
109 }
110
111 static gboolean
gst_gio_base_src_start(GstBaseSrc * base_src)112 gst_gio_base_src_start (GstBaseSrc * base_src)
113 {
114 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
115 GstGioBaseSrcClass *gbsrc_class = GST_GIO_BASE_SRC_GET_CLASS (src);
116
117 src->position = 0;
118
119 /* FIXME: This will likely block */
120 src->stream = gbsrc_class->get_stream (src);
121 if (G_UNLIKELY (!G_IS_INPUT_STREAM (src->stream))) {
122 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
123 ("No input stream provided by subclass"));
124 return FALSE;
125 } else if (G_UNLIKELY (g_input_stream_is_closed (src->stream))) {
126 GST_ELEMENT_ERROR (src, LIBRARY, FAILED, (NULL),
127 ("Input stream is already closed"));
128 return FALSE;
129 }
130
131 if (G_IS_SEEKABLE (src->stream))
132 src->position = g_seekable_tell (G_SEEKABLE (src->stream));
133
134 GST_DEBUG_OBJECT (src, "started source");
135
136 return TRUE;
137 }
138
139 static gboolean
gst_gio_base_src_stop(GstBaseSrc * base_src)140 gst_gio_base_src_stop (GstBaseSrc * base_src)
141 {
142 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
143 GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
144 gboolean success;
145 GError *err = NULL;
146
147 if (klass->close_on_stop && G_IS_INPUT_STREAM (src->stream)) {
148 GST_DEBUG_OBJECT (src, "closing stream");
149
150 /* FIXME: can block but unfortunately we can't use async operations
151 * here because they require a running main loop */
152 success = g_input_stream_close (src->stream, src->cancel, &err);
153
154 if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) {
155 GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
156 ("g_input_stream_close failed: %s", err->message));
157 g_clear_error (&err);
158 } else if (!success) {
159 GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
160 ("g_input_stream_close failed"));
161 } else {
162 GST_DEBUG_OBJECT (src, "g_input_stream_close succeeded");
163 }
164
165 g_object_unref (src->stream);
166 src->stream = NULL;
167 } else {
168 g_object_unref (src->stream);
169 src->stream = NULL;
170 }
171
172 return TRUE;
173 }
174
175 static gboolean
gst_gio_base_src_get_size(GstBaseSrc * base_src,guint64 * size)176 gst_gio_base_src_get_size (GstBaseSrc * base_src, guint64 * size)
177 {
178 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
179
180 if (G_IS_FILE_INPUT_STREAM (src->stream)) {
181 GFileInfo *info;
182 GError *err = NULL;
183
184 info = g_file_input_stream_query_info (G_FILE_INPUT_STREAM (src->stream),
185 G_FILE_ATTRIBUTE_STANDARD_SIZE, src->cancel, &err);
186
187 if (info != NULL) {
188 *size = g_file_info_get_size (info);
189 g_object_unref (info);
190 GST_DEBUG_OBJECT (src, "found size: %" G_GUINT64_FORMAT, *size);
191 return TRUE;
192 }
193
194 if (!gst_gio_error (src, "g_file_input_stream_query_info", &err, NULL)) {
195
196 if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
197 GST_DEBUG_OBJECT (src, "size information not available");
198 else
199 GST_WARNING_OBJECT (src, "size information retrieval failed: %s",
200 err->message);
201
202 g_clear_error (&err);
203 }
204 }
205
206 if (GST_GIO_STREAM_IS_SEEKABLE (src->stream)) {
207 goffset old;
208 goffset stream_size;
209 gboolean ret;
210 GSeekable *seekable = G_SEEKABLE (src->stream);
211 GError *err = NULL;
212
213 old = g_seekable_tell (seekable);
214
215 ret = g_seekable_seek (seekable, 0, G_SEEK_END, src->cancel, &err);
216 if (!ret) {
217 if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
218 if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
219 GST_DEBUG_OBJECT (src,
220 "Seeking to the end of stream is not supported");
221 else
222 GST_WARNING_OBJECT (src, "Seeking to end of stream failed: %s",
223 err->message);
224 g_clear_error (&err);
225 } else {
226 GST_WARNING_OBJECT (src, "Seeking to end of stream failed");
227 }
228 return FALSE;
229 }
230
231 stream_size = g_seekable_tell (seekable);
232
233 ret = g_seekable_seek (seekable, old, G_SEEK_SET, src->cancel, &err);
234 if (!ret) {
235 if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
236 if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
237 GST_ERROR_OBJECT (src, "Seeking to the old position not supported");
238 else
239 GST_ERROR_OBJECT (src, "Seeking to the old position failed: %s",
240 err->message);
241 g_clear_error (&err);
242 } else {
243 GST_ERROR_OBJECT (src, "Seeking to the old position failed");
244 }
245 return FALSE;
246 }
247
248 if (stream_size >= 0) {
249 *size = stream_size;
250 return TRUE;
251 }
252 }
253
254 return FALSE;
255 }
256
257 static gboolean
gst_gio_base_src_is_seekable(GstBaseSrc * base_src)258 gst_gio_base_src_is_seekable (GstBaseSrc * base_src)
259 {
260 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
261 gboolean seekable;
262
263 seekable = GST_GIO_STREAM_IS_SEEKABLE (src->stream);
264
265 GST_DEBUG_OBJECT (src, "can seek: %d", seekable);
266
267 return seekable;
268 }
269
270 static gboolean
gst_gio_base_src_unlock(GstBaseSrc * base_src)271 gst_gio_base_src_unlock (GstBaseSrc * base_src)
272 {
273 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
274
275 GST_LOG_OBJECT (src, "triggering cancellation");
276
277 g_cancellable_cancel (src->cancel);
278
279 return TRUE;
280 }
281
282 static gboolean
gst_gio_base_src_unlock_stop(GstBaseSrc * base_src)283 gst_gio_base_src_unlock_stop (GstBaseSrc * base_src)
284 {
285 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
286
287 GST_LOG_OBJECT (src, "resetting cancellable");
288
289 g_object_unref (src->cancel);
290 src->cancel = g_cancellable_new ();
291
292 return TRUE;
293 }
294
295 static GstFlowReturn
gst_gio_base_src_create(GstBaseSrc * base_src,guint64 offset,guint size,GstBuffer ** buf_return)296 gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
297 GstBuffer ** buf_return)
298 {
299 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
300 GstBuffer *buf;
301 GstFlowReturn ret = GST_FLOW_OK;
302
303 g_return_val_if_fail (G_IS_INPUT_STREAM (src->stream), GST_FLOW_ERROR);
304
305 /* If we have the requested part in our cache take a subbuffer of that,
306 * otherwise fill the cache again with at least 4096 bytes from the
307 * requested offset and return a subbuffer of that.
308 *
309 * We need caching because every read/seek operation will need to go
310 * over DBus if our backend is GVfs and this is painfully slow. */
311 if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
312 offset + size <= GST_BUFFER_OFFSET_END (src->cache)) {
313 GST_DEBUG_OBJECT (src, "Creating subbuffer from cached buffer: offset %"
314 G_GUINT64_FORMAT " length %u", offset, size);
315
316 buf = gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL,
317 offset - GST_BUFFER_OFFSET (src->cache), size);
318
319 GST_BUFFER_OFFSET (buf) = offset;
320 GST_BUFFER_OFFSET_END (buf) = offset + size;
321 } else {
322 guint cachesize = MAX (4096, size);
323 GstMapInfo map;
324 gssize read, streamread, res;
325 guint64 readoffset;
326 gboolean success, eos;
327 GError *err = NULL;
328 GstBuffer *newbuffer;
329 GstMemory *mem;
330 gboolean waited_for_data = FALSE;
331 GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
332
333 newbuffer = gst_buffer_new ();
334
335 /* copy any overlapping data from the cached buffer */
336 if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
337 offset <= GST_BUFFER_OFFSET_END (src->cache)) {
338 read = GST_BUFFER_OFFSET_END (src->cache) - offset;
339 GST_LOG_OBJECT (src,
340 "Copying %" G_GSSIZE_FORMAT " bytes from cached buffer at %"
341 G_GUINT64_FORMAT, read, offset - GST_BUFFER_OFFSET (src->cache));
342 gst_buffer_copy_into (newbuffer, src->cache, GST_BUFFER_COPY_MEMORY,
343 offset - GST_BUFFER_OFFSET (src->cache), read);
344 } else {
345 read = 0;
346 }
347
348 if (src->cache)
349 gst_buffer_unref (src->cache);
350 src->cache = newbuffer;
351
352 readoffset = offset + read;
353 GST_LOG_OBJECT (src,
354 "Reading %u bytes from offset %" G_GUINT64_FORMAT, cachesize,
355 readoffset);
356
357 if (G_UNLIKELY (readoffset != src->position)) {
358 if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream))
359 return GST_FLOW_NOT_SUPPORTED;
360
361 GST_DEBUG_OBJECT (src, "Seeking to position %" G_GUINT64_FORMAT,
362 readoffset);
363 ret =
364 gst_gio_seek (src, G_SEEKABLE (src->stream), readoffset, src->cancel);
365
366 if (ret == GST_FLOW_OK)
367 src->position = readoffset;
368 else
369 return ret;
370 }
371
372 /* GIO sometimes gives less bytes than requested although
373 * it's not at the end of file. SMB for example only
374 * supports reads up to 64k. So we loop here until we get at
375 * at least the requested amount of bytes or a read returns
376 * nothing. */
377 mem = gst_allocator_alloc (NULL, cachesize, NULL);
378 if (mem == NULL) {
379 GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", cachesize);
380 return GST_FLOW_ERROR;
381 }
382
383 gst_memory_map (mem, &map, GST_MAP_WRITE);
384 streamread = 0;
385 while (size - read > 0 && (res =
386 g_input_stream_read (G_INPUT_STREAM (src->stream),
387 map.data + streamread, cachesize - streamread, src->cancel,
388 &err)) >= 0) {
389
390 read += res;
391 streamread += res;
392 src->position += res;
393
394 if (res != 0)
395 continue;
396
397 if (!klass->wait_for_data || !klass->wait_for_data (src))
398 break;
399
400 waited_for_data = TRUE;
401 }
402
403 if (waited_for_data && klass->waited_for_data)
404 klass->waited_for_data (src);
405
406 gst_memory_unmap (mem, &map);
407 gst_buffer_append_memory (src->cache, mem);
408
409 success = (read >= 0);
410 eos = (cachesize > 0 && read == 0);
411
412 if (!success && !gst_gio_error (src, "g_input_stream_read", &err, &ret)) {
413 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
414 ("Could not read from stream: %s", err->message));
415 g_clear_error (&err);
416 }
417
418 if (success && !eos) {
419 GST_BUFFER_OFFSET (src->cache) = offset;
420 GST_BUFFER_OFFSET_END (src->cache) = offset + read;
421
422 GST_DEBUG_OBJECT (src, "Read successful");
423 GST_DEBUG_OBJECT (src, "Creating subbuffer from new "
424 "cached buffer: offset %" G_GUINT64_FORMAT " length %u", offset,
425 size);
426
427 buf =
428 gst_buffer_copy_region (src->cache, GST_BUFFER_COPY_ALL, 0, MIN (size,
429 read));
430
431 GST_BUFFER_OFFSET (buf) = offset;
432 GST_BUFFER_OFFSET_END (buf) = offset + MIN (size, read);
433 } else {
434 GST_DEBUG_OBJECT (src, "Read not successful");
435 gst_buffer_unref (src->cache);
436 src->cache = NULL;
437 buf = NULL;
438 }
439
440 if (eos)
441 ret = GST_FLOW_EOS;
442 }
443
444 *buf_return = buf;
445
446 return ret;
447 }
448
449 static gboolean
gst_gio_base_src_query(GstBaseSrc * base_src,GstQuery * query)450 gst_gio_base_src_query (GstBaseSrc * base_src, GstQuery * query)
451 {
452 gboolean ret = FALSE;
453 GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
454
455 switch (GST_QUERY_TYPE (query)) {
456 case GST_QUERY_URI:
457 if (GST_IS_URI_HANDLER (src)) {
458 gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
459 gst_query_set_uri (query, uri);
460 g_free (uri);
461 ret = TRUE;
462 }
463 break;
464 default:
465 ret = FALSE;
466 break;
467 }
468
469 if (!ret)
470 ret = GST_BASE_SRC_CLASS (parent_class)->query (base_src, query);
471
472 return ret;
473 }
474