1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General
17 * Public License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19 * Boston, MA 02111-1307, USA.
20 *
21 * Author: Christian Kellner <gicmo@gnome.org>
22 */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34 #include "gioalias.h"
35
36 /**
37 * SECTION:gbufferedinputstream
38 * @short_description: Buffered Input Stream
39 * @include: gio/gio.h
40 * @see_also: #GFilterInputStream, #GInputStream
41 *
42 * Buffered input stream implements #GFilterInputStream and provides
43 * for buffered reads.
44 *
45 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
46 *
47 * To create a buffered input stream, use g_buffered_input_stream_new(),
48 * or g_buffered_input_stream_new_sized() to specify the buffer's size at
49 * construction.
50 *
51 * To get the size of a buffer within a buffered input stream, use
52 * g_buffered_input_stream_get_buffer_size(). To change the size of a
53 * buffered input stream's buffer, use
54 * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
55 * cannot be reduced below the size of the data within the buffer.
56 *
57 **/
58
59
60
61 #define DEFAULT_BUFFER_SIZE 4096
62
63 struct _GBufferedInputStreamPrivate {
64 guint8 *buffer;
65 gsize len;
66 gsize pos;
67 gsize end;
68 GAsyncReadyCallback outstanding_callback;
69 };
70
71 enum {
72 PROP_0,
73 PROP_BUFSIZE
74 };
75
76 static void g_buffered_input_stream_set_property (GObject *object,
77 guint prop_id,
78 const GValue *value,
79 GParamSpec *pspec);
80
81 static void g_buffered_input_stream_get_property (GObject *object,
82 guint prop_id,
83 GValue *value,
84 GParamSpec *pspec);
85 static void g_buffered_input_stream_finalize (GObject *object);
86
87
88 static gssize g_buffered_input_stream_skip (GInputStream *stream,
89 gsize count,
90 GCancellable *cancellable,
91 GError **error);
92 static void g_buffered_input_stream_skip_async (GInputStream *stream,
93 gsize count,
94 int io_priority,
95 GCancellable *cancellable,
96 GAsyncReadyCallback callback,
97 gpointer user_data);
98 static gssize g_buffered_input_stream_skip_finish (GInputStream *stream,
99 GAsyncResult *result,
100 GError **error);
101 static gssize g_buffered_input_stream_read (GInputStream *stream,
102 void *buffer,
103 gsize count,
104 GCancellable *cancellable,
105 GError **error);
106 static void g_buffered_input_stream_read_async (GInputStream *stream,
107 void *buffer,
108 gsize count,
109 int io_priority,
110 GCancellable *cancellable,
111 GAsyncReadyCallback callback,
112 gpointer user_data);
113 static gssize g_buffered_input_stream_read_finish (GInputStream *stream,
114 GAsyncResult *result,
115 GError **error);
116 static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
117 gssize count,
118 GCancellable *cancellable,
119 GError **error);
120 static void g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
121 gssize count,
122 int io_priority,
123 GCancellable *cancellable,
124 GAsyncReadyCallback callback,
125 gpointer user_data);
126 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
127 GAsyncResult *result,
128 GError **error);
129
130 static void compact_buffer (GBufferedInputStream *stream);
131
G_DEFINE_TYPE(GBufferedInputStream,g_buffered_input_stream,G_TYPE_FILTER_INPUT_STREAM)132 G_DEFINE_TYPE (GBufferedInputStream,
133 g_buffered_input_stream,
134 G_TYPE_FILTER_INPUT_STREAM)
135
136
137 static void
138 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
139 {
140 GObjectClass *object_class;
141 GInputStreamClass *istream_class;
142 GBufferedInputStreamClass *bstream_class;
143
144 g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
145
146 object_class = G_OBJECT_CLASS (klass);
147 object_class->get_property = g_buffered_input_stream_get_property;
148 object_class->set_property = g_buffered_input_stream_set_property;
149 object_class->finalize = g_buffered_input_stream_finalize;
150
151 istream_class = G_INPUT_STREAM_CLASS (klass);
152 istream_class->skip = g_buffered_input_stream_skip;
153 istream_class->skip_async = g_buffered_input_stream_skip_async;
154 istream_class->skip_finish = g_buffered_input_stream_skip_finish;
155 istream_class->read_fn = g_buffered_input_stream_read;
156 istream_class->read_async = g_buffered_input_stream_read_async;
157 istream_class->read_finish = g_buffered_input_stream_read_finish;
158
159 bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
160 bstream_class->fill = g_buffered_input_stream_real_fill;
161 bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
162 bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
163
164 g_object_class_install_property (object_class,
165 PROP_BUFSIZE,
166 g_param_spec_uint ("buffer-size",
167 P_("Buffer Size"),
168 P_("The size of the backend buffer"),
169 1,
170 G_MAXUINT,
171 DEFAULT_BUFFER_SIZE,
172 G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
173 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
174
175
176 }
177
178 /**
179 * g_buffered_input_stream_get_buffer_size:
180 * @stream: #GBufferedInputStream.
181 *
182 * Gets the size of the input buffer.
183 *
184 * Returns: the current buffer size.
185 **/
186 gsize
g_buffered_input_stream_get_buffer_size(GBufferedInputStream * stream)187 g_buffered_input_stream_get_buffer_size (GBufferedInputStream *stream)
188 {
189 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
190
191 return stream->priv->len;
192 }
193
194 /**
195 * g_buffered_input_stream_set_buffer_size:
196 * @stream: #GBufferedInputStream.
197 * @size: a #gsize.
198 *
199 * Sets the size of the internal buffer of @stream to @size, or to the
200 * size of the contents of the buffer. The buffer can never be resized
201 * smaller than its current contents.
202 **/
203 void
g_buffered_input_stream_set_buffer_size(GBufferedInputStream * stream,gsize size)204 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
205 gsize size)
206 {
207 GBufferedInputStreamPrivate *priv;
208 gsize in_buffer;
209 guint8 *buffer;
210
211 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
212
213 priv = stream->priv;
214
215 if (priv->len == size)
216 return;
217
218 if (priv->buffer)
219 {
220 in_buffer = priv->end - priv->pos;
221
222 /* Never resize smaller than current buffer contents */
223 size = MAX (size, in_buffer);
224
225 buffer = g_malloc (size);
226 memcpy (buffer, priv->buffer + priv->pos, in_buffer);
227 priv->len = size;
228 priv->pos = 0;
229 priv->end = in_buffer;
230 g_free (priv->buffer);
231 priv->buffer = buffer;
232 }
233 else
234 {
235 priv->len = size;
236 priv->pos = 0;
237 priv->end = 0;
238 priv->buffer = g_malloc (size);
239 }
240
241 g_object_notify (G_OBJECT (stream), "buffer-size");
242 }
243
244 static void
g_buffered_input_stream_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)245 g_buffered_input_stream_set_property (GObject *object,
246 guint prop_id,
247 const GValue *value,
248 GParamSpec *pspec)
249 {
250 GBufferedInputStreamPrivate *priv;
251 GBufferedInputStream *bstream;
252
253 bstream = G_BUFFERED_INPUT_STREAM (object);
254 priv = bstream->priv;
255
256 switch (prop_id)
257 {
258 case PROP_BUFSIZE:
259 g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
260 break;
261
262 default:
263 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
264 break;
265 }
266
267 }
268
269 static void
g_buffered_input_stream_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)270 g_buffered_input_stream_get_property (GObject *object,
271 guint prop_id,
272 GValue *value,
273 GParamSpec *pspec)
274 {
275 GBufferedInputStreamPrivate *priv;
276 GBufferedInputStream *bstream;
277
278 bstream = G_BUFFERED_INPUT_STREAM (object);
279 priv = bstream->priv;
280
281 switch (prop_id)
282 {
283 case PROP_BUFSIZE:
284 g_value_set_uint (value, priv->len);
285 break;
286
287 default:
288 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
289 break;
290 }
291 }
292
293 static void
g_buffered_input_stream_finalize(GObject * object)294 g_buffered_input_stream_finalize (GObject *object)
295 {
296 GBufferedInputStreamPrivate *priv;
297 GBufferedInputStream *stream;
298
299 stream = G_BUFFERED_INPUT_STREAM (object);
300 priv = stream->priv;
301
302 g_free (priv->buffer);
303
304 G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
305 }
306
307 static void
g_buffered_input_stream_init(GBufferedInputStream * stream)308 g_buffered_input_stream_init (GBufferedInputStream *stream)
309 {
310 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
311 G_TYPE_BUFFERED_INPUT_STREAM,
312 GBufferedInputStreamPrivate);
313 }
314
315
316 /**
317 * g_buffered_input_stream_new:
318 * @base_stream: a #GInputStream.
319 *
320 * Creates a new #GInputStream from the given @base_stream, with
321 * a buffer set to the default size (4 kilobytes).
322 *
323 * Returns: a #GInputStream for the given @base_stream.
324 **/
325 GInputStream *
g_buffered_input_stream_new(GInputStream * base_stream)326 g_buffered_input_stream_new (GInputStream *base_stream)
327 {
328 GInputStream *stream;
329
330 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
331
332 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
333 "base-stream", base_stream,
334 NULL);
335
336 return stream;
337 }
338
339 /**
340 * g_buffered_input_stream_new_sized:
341 * @base_stream: a #GInputStream.
342 * @size: a #gsize.
343 *
344 * Creates a new #GBufferedInputStream from the given @base_stream,
345 * with a buffer set to @size.
346 *
347 * Returns: a #GInputStream.
348 **/
349 GInputStream *
g_buffered_input_stream_new_sized(GInputStream * base_stream,gsize size)350 g_buffered_input_stream_new_sized (GInputStream *base_stream,
351 gsize size)
352 {
353 GInputStream *stream;
354
355 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
356
357 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
358 "base-stream", base_stream,
359 "buffer-size", (guint)size,
360 NULL);
361
362 return stream;
363 }
364
365 /**
366 * g_buffered_input_stream_fill:
367 * @stream: #GBufferedInputStream.
368 * @count: the number of bytes that will be read from the stream.
369 * @cancellable: optional #GCancellable object, %NULL to ignore.
370 * @error: location to store the error occuring, or %NULL to ignore.
371 *
372 * Tries to read @count bytes from the stream into the buffer.
373 * Will block during this read.
374 *
375 * If @count is zero, returns zero and does nothing. A value of @count
376 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
377 *
378 * On success, the number of bytes read into the buffer is returned.
379 * It is not an error if this is not the same as the requested size, as it
380 * can happen e.g. near the end of a file. Zero is returned on end of file
381 * (or if @count is zero), but never otherwise.
382 *
383 * If @count is -1 then the attempted read size is equal to the number of
384 * bytes that are required to fill the buffer.
385 *
386 * If @cancellable is not %NULL, then the operation can be cancelled by
387 * triggering the cancellable object from another thread. If the operation
388 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
389 * operation was partially finished when the operation was cancelled the
390 * partial result will be returned, without an error.
391 *
392 * On error -1 is returned and @error is set accordingly.
393 *
394 * For the asynchronous, non-blocking, version of this function, see
395 * g_buffered_input_stream_fill_async().
396 *
397 * Returns: the number of bytes read into @stream's buffer, up to @count,
398 * or -1 on error.
399 **/
400 gssize
g_buffered_input_stream_fill(GBufferedInputStream * stream,gssize count,GCancellable * cancellable,GError ** error)401 g_buffered_input_stream_fill (GBufferedInputStream *stream,
402 gssize count,
403 GCancellable *cancellable,
404 GError **error)
405 {
406 GBufferedInputStreamClass *class;
407 GInputStream *input_stream;
408 gssize res;
409
410 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
411
412 input_stream = G_INPUT_STREAM (stream);
413
414 if (count < -1)
415 {
416 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
417 _("Too large count value passed to %s"), G_STRFUNC);
418 return -1;
419 }
420
421 if (!g_input_stream_set_pending (input_stream, error))
422 return -1;
423
424 if (cancellable)
425 g_cancellable_push_current (cancellable);
426
427 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
428 res = class->fill (stream, count, cancellable, error);
429
430 if (cancellable)
431 g_cancellable_pop_current (cancellable);
432
433 g_input_stream_clear_pending (input_stream);
434
435 return res;
436 }
437
438 static void
async_fill_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)439 async_fill_callback_wrapper (GObject *source_object,
440 GAsyncResult *res,
441 gpointer user_data)
442 {
443 GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
444
445 g_input_stream_clear_pending (G_INPUT_STREAM (stream));
446 (*stream->priv->outstanding_callback) (source_object, res, user_data);
447 g_object_unref (stream);
448 }
449
450 /**
451 * g_buffered_input_stream_fill_async:
452 * @stream: #GBufferedInputStream.
453 * @count: the number of bytes that will be read from the stream.
454 * @io_priority: the <link linkend="io-priority">I/O priority</link>
455 * of the request.
456 * @cancellable: optional #GCancellable object
457 * @callback: a #GAsyncReadyCallback.
458 * @user_data: a #gpointer.
459 *
460 * Reads data into @stream's buffer asynchronously, up to @count size.
461 * @io_priority can be used to prioritize reads. For the synchronous
462 * version of this function, see g_buffered_input_stream_fill().
463 *
464 * If @count is -1 then the attempted read size is equal to the number
465 * of bytes that are required to fill the buffer.
466 **/
467 void
g_buffered_input_stream_fill_async(GBufferedInputStream * stream,gssize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)468 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
469 gssize count,
470 int io_priority,
471 GCancellable *cancellable,
472 GAsyncReadyCallback callback,
473 gpointer user_data)
474 {
475 GBufferedInputStreamClass *class;
476 GSimpleAsyncResult *simple;
477 GError *error = NULL;
478
479 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
480
481 if (count == 0)
482 {
483 simple = g_simple_async_result_new (G_OBJECT (stream),
484 callback,
485 user_data,
486 g_buffered_input_stream_fill_async);
487 g_simple_async_result_complete_in_idle (simple);
488 g_object_unref (simple);
489 return;
490 }
491
492 if (count < -1)
493 {
494 g_simple_async_report_error_in_idle (G_OBJECT (stream),
495 callback,
496 user_data,
497 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
498 _("Too large count value passed to %s"),
499 G_STRFUNC);
500 return;
501 }
502
503 if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
504 {
505 g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
506 callback,
507 user_data,
508 error);
509 g_error_free (error);
510 return;
511 }
512
513 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
514
515 stream->priv->outstanding_callback = callback;
516 g_object_ref (stream);
517 class->fill_async (stream, count, io_priority, cancellable,
518 async_fill_callback_wrapper, user_data);
519 }
520
521 /**
522 * g_buffered_input_stream_fill_finish:
523 * @stream: a #GBufferedInputStream.
524 * @result: a #GAsyncResult.
525 * @error: a #GError.
526 *
527 * Finishes an asynchronous read.
528 *
529 * Returns: a #gssize of the read stream, or %-1 on an error.
530 **/
531 gssize
g_buffered_input_stream_fill_finish(GBufferedInputStream * stream,GAsyncResult * result,GError ** error)532 g_buffered_input_stream_fill_finish (GBufferedInputStream *stream,
533 GAsyncResult *result,
534 GError **error)
535 {
536 GSimpleAsyncResult *simple;
537 GBufferedInputStreamClass *class;
538
539 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
540 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
541
542 if (G_IS_SIMPLE_ASYNC_RESULT (result))
543 {
544 simple = G_SIMPLE_ASYNC_RESULT (result);
545 if (g_simple_async_result_propagate_error (simple, error))
546 return -1;
547
548 /* Special case read of 0 bytes */
549 if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
550 return 0;
551 }
552
553 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
554 return class->fill_finish (stream, result, error);
555 }
556
557 /**
558 * g_buffered_input_stream_get_available:
559 * @stream: #GBufferedInputStream.
560 *
561 * Gets the size of the available data within the stream.
562 *
563 * Returns: size of the available stream.
564 **/
565 gsize
g_buffered_input_stream_get_available(GBufferedInputStream * stream)566 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
567 {
568 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
569
570 return stream->priv->end - stream->priv->pos;
571 }
572
573 /**
574 * g_buffered_input_stream_peek:
575 * @stream: a #GBufferedInputStream.
576 * @buffer: a pointer to an allocated chunk of memory.
577 * @offset: a #gsize.
578 * @count: a #gsize.
579 *
580 * Peeks in the buffer, copying data of size @count into @buffer,
581 * offset @offset bytes.
582 *
583 * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
584 **/
585 gsize
g_buffered_input_stream_peek(GBufferedInputStream * stream,void * buffer,gsize offset,gsize count)586 g_buffered_input_stream_peek (GBufferedInputStream *stream,
587 void *buffer,
588 gsize offset,
589 gsize count)
590 {
591 gsize available;
592 gsize end;
593
594 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
595 g_return_val_if_fail (buffer != NULL, -1);
596
597 available = g_buffered_input_stream_get_available (stream);
598
599 if (offset > available)
600 return 0;
601
602 end = MIN (offset + count, available);
603 count = end - offset;
604
605 memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
606 return count;
607 }
608
609 /**
610 * g_buffered_input_stream_peek_buffer:
611 * @stream: a #GBufferedInputStream.
612 * @count: a #gsize to get the number of bytes available in the buffer.
613 *
614 * Returns the buffer with the currently available bytes. The returned
615 * buffer must not be modified and will become invalid when reading from
616 * the stream or filling the buffer.
617 *
618 * Returns: read-only buffer
619 **/
620 const void*
g_buffered_input_stream_peek_buffer(GBufferedInputStream * stream,gsize * count)621 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
622 gsize *count)
623 {
624 GBufferedInputStreamPrivate *priv;
625
626 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
627
628 priv = stream->priv;
629
630 if (count)
631 *count = priv->end - priv->pos;
632
633 return priv->buffer + priv->pos;
634 }
635
636 static void
compact_buffer(GBufferedInputStream * stream)637 compact_buffer (GBufferedInputStream *stream)
638 {
639 GBufferedInputStreamPrivate *priv;
640 gsize current_size;
641
642 priv = stream->priv;
643
644 current_size = priv->end - priv->pos;
645
646 g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
647
648 priv->pos = 0;
649 priv->end = current_size;
650 }
651
652 static gssize
g_buffered_input_stream_real_fill(GBufferedInputStream * stream,gssize count,GCancellable * cancellable,GError ** error)653 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
654 gssize count,
655 GCancellable *cancellable,
656 GError **error)
657 {
658 GBufferedInputStreamPrivate *priv;
659 GInputStream *base_stream;
660 gssize nread;
661 gsize in_buffer;
662
663 priv = stream->priv;
664
665 if (count == -1)
666 count = priv->len;
667
668 in_buffer = priv->end - priv->pos;
669
670 /* Never fill more than can fit in the buffer */
671 count = MIN (count, priv->len - in_buffer);
672
673 /* If requested length does not fit at end, compact */
674 if (priv->len - priv->end < count)
675 compact_buffer (stream);
676
677 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
678 nread = g_input_stream_read (base_stream,
679 priv->buffer + priv->end,
680 count,
681 cancellable,
682 error);
683
684 if (nread > 0)
685 priv->end += nread;
686
687 return nread;
688 }
689
690 static gssize
g_buffered_input_stream_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)691 g_buffered_input_stream_skip (GInputStream *stream,
692 gsize count,
693 GCancellable *cancellable,
694 GError **error)
695 {
696 GBufferedInputStream *bstream;
697 GBufferedInputStreamPrivate *priv;
698 GBufferedInputStreamClass *class;
699 GInputStream *base_stream;
700 gsize available, bytes_skipped;
701 gssize nread;
702
703 bstream = G_BUFFERED_INPUT_STREAM (stream);
704 priv = bstream->priv;
705
706 available = priv->end - priv->pos;
707
708 if (count <= available)
709 {
710 priv->pos += count;
711 return count;
712 }
713
714 /* Full request not available, skip all currently available and
715 * request refill for more
716 */
717
718 priv->pos = 0;
719 priv->end = 0;
720 bytes_skipped = available;
721 count -= available;
722
723 if (bytes_skipped > 0)
724 error = NULL; /* Ignore further errors if we already read some data */
725
726 if (count > priv->len)
727 {
728 /* Large request, shortcut buffer */
729
730 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
731
732 nread = g_input_stream_skip (base_stream,
733 count,
734 cancellable,
735 error);
736
737 if (nread < 0 && bytes_skipped == 0)
738 return -1;
739
740 if (nread > 0)
741 bytes_skipped += nread;
742
743 return bytes_skipped;
744 }
745
746 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
747 nread = class->fill (bstream, priv->len, cancellable, error);
748
749 if (nread < 0)
750 {
751 if (bytes_skipped == 0)
752 return -1;
753 else
754 return bytes_skipped;
755 }
756
757 available = priv->end - priv->pos;
758 count = MIN (count, available);
759
760 bytes_skipped += count;
761 priv->pos += count;
762
763 return bytes_skipped;
764 }
765
766 static gssize
g_buffered_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)767 g_buffered_input_stream_read (GInputStream *stream,
768 void *buffer,
769 gsize count,
770 GCancellable *cancellable,
771 GError **error)
772 {
773 GBufferedInputStream *bstream;
774 GBufferedInputStreamPrivate *priv;
775 GBufferedInputStreamClass *class;
776 GInputStream *base_stream;
777 gsize available, bytes_read;
778 gssize nread;
779
780 bstream = G_BUFFERED_INPUT_STREAM (stream);
781 priv = bstream->priv;
782
783 available = priv->end - priv->pos;
784
785 if (count <= available)
786 {
787 memcpy (buffer, priv->buffer + priv->pos, count);
788 priv->pos += count;
789 return count;
790 }
791
792 /* Full request not available, read all currently availbile and request refill for more */
793
794 memcpy (buffer, priv->buffer + priv->pos, available);
795 priv->pos = 0;
796 priv->end = 0;
797 bytes_read = available;
798 count -= available;
799
800 if (bytes_read > 0)
801 error = NULL; /* Ignore further errors if we already read some data */
802
803 if (count > priv->len)
804 {
805 /* Large request, shortcut buffer */
806
807 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
808
809 nread = g_input_stream_read (base_stream,
810 (char *)buffer + bytes_read,
811 count,
812 cancellable,
813 error);
814
815 if (nread < 0 && bytes_read == 0)
816 return -1;
817
818 if (nread > 0)
819 bytes_read += nread;
820
821 return bytes_read;
822 }
823
824 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
825 nread = class->fill (bstream, priv->len, cancellable, error);
826 if (nread < 0)
827 {
828 if (bytes_read == 0)
829 return -1;
830 else
831 return bytes_read;
832 }
833
834 available = priv->end - priv->pos;
835 count = MIN (count, available);
836
837 memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
838 bytes_read += count;
839 priv->pos += count;
840
841 return bytes_read;
842 }
843
844 /**
845 * g_buffered_input_stream_read_byte:
846 * @stream: #GBufferedInputStream.
847 * @cancellable: optional #GCancellable object, %NULL to ignore.
848 * @error: location to store the error occuring, or %NULL to ignore.
849 *
850 * Tries to read a single byte from the stream or the buffer. Will block
851 * during this read.
852 *
853 * On success, the byte read from the stream is returned. On end of stream
854 * -1 is returned but it's not an exceptional error and @error is not set.
855 *
856 * If @cancellable is not %NULL, then the operation can be cancelled by
857 * triggering the cancellable object from another thread. If the operation
858 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
859 * operation was partially finished when the operation was cancelled the
860 * partial result will be returned, without an error.
861 *
862 * On error -1 is returned and @error is set accordingly.
863 *
864 * Returns: the byte read from the @stream, or -1 on end of stream or error.
865 **/
866 int
g_buffered_input_stream_read_byte(GBufferedInputStream * stream,GCancellable * cancellable,GError ** error)867 g_buffered_input_stream_read_byte (GBufferedInputStream *stream,
868 GCancellable *cancellable,
869 GError **error)
870 {
871 GBufferedInputStreamPrivate *priv;
872 GBufferedInputStreamClass *class;
873 GInputStream *input_stream;
874 gsize available;
875 gssize nread;
876
877 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
878
879 priv = stream->priv;
880 input_stream = G_INPUT_STREAM (stream);
881
882 if (g_input_stream_is_closed (input_stream))
883 {
884 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
885 _("Stream is already closed"));
886 return -1;
887 }
888
889 if (!g_input_stream_set_pending (input_stream, error))
890 return -1;
891
892 available = priv->end - priv->pos;
893
894 if (available != 0)
895 {
896 g_input_stream_clear_pending (input_stream);
897 return priv->buffer[priv->pos++];
898 }
899
900 /* Byte not available, request refill for more */
901
902 if (cancellable)
903 g_cancellable_push_current (cancellable);
904
905 priv->pos = 0;
906 priv->end = 0;
907
908 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
909 nread = class->fill (stream, priv->len, cancellable, error);
910
911 if (cancellable)
912 g_cancellable_pop_current (cancellable);
913
914 g_input_stream_clear_pending (input_stream);
915
916 if (nread <= 0)
917 return -1; /* error or end of stream */
918
919 return priv->buffer[priv->pos++];
920 }
921
922 /* ************************** */
923 /* Async stuff implementation */
924 /* ************************** */
925
926 static void
fill_async_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)927 fill_async_callback (GObject *source_object,
928 GAsyncResult *result,
929 gpointer user_data)
930 {
931 GError *error;
932 gssize res;
933 GSimpleAsyncResult *simple;
934
935 simple = user_data;
936
937 error = NULL;
938 res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
939 result, &error);
940
941 g_simple_async_result_set_op_res_gssize (simple, res);
942 if (res == -1)
943 {
944 g_simple_async_result_set_from_error (simple, error);
945 g_error_free (error);
946 }
947 else
948 {
949 GBufferedInputStreamPrivate *priv;
950 GObject *object;
951
952 object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
953 priv = G_BUFFERED_INPUT_STREAM (object)->priv;
954
955 g_assert_cmpint (priv->end + res, <=, priv->len);
956 priv->end += res;
957
958 g_object_unref (object);
959 }
960
961 /* Complete immediately, not in idle, since we're already in a mainloop callout */
962 g_simple_async_result_complete (simple);
963 g_object_unref (simple);
964 }
965
966 static void
g_buffered_input_stream_real_fill_async(GBufferedInputStream * stream,gssize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)967 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
968 gssize count,
969 int io_priority,
970 GCancellable *cancellable,
971 GAsyncReadyCallback callback,
972 gpointer user_data)
973 {
974 GBufferedInputStreamPrivate *priv;
975 GInputStream *base_stream;
976 GSimpleAsyncResult *simple;
977 gsize in_buffer;
978
979 priv = stream->priv;
980
981 if (count == -1)
982 count = priv->len;
983
984 in_buffer = priv->end - priv->pos;
985
986 /* Never fill more than can fit in the buffer */
987 count = MIN (count, priv->len - in_buffer);
988
989 /* If requested length does not fit at end, compact */
990 if (priv->len - priv->end < count)
991 compact_buffer (stream);
992
993 simple = g_simple_async_result_new (G_OBJECT (stream),
994 callback, user_data,
995 g_buffered_input_stream_real_fill_async);
996
997 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
998 g_input_stream_read_async (base_stream,
999 priv->buffer + priv->end,
1000 count,
1001 io_priority,
1002 cancellable,
1003 fill_async_callback,
1004 simple);
1005 }
1006
1007 static gssize
g_buffered_input_stream_real_fill_finish(GBufferedInputStream * stream,GAsyncResult * result,GError ** error)1008 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1009 GAsyncResult *result,
1010 GError **error)
1011 {
1012 GSimpleAsyncResult *simple;
1013 gssize nread;
1014
1015 simple = G_SIMPLE_ASYNC_RESULT (result);
1016 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1017
1018 nread = g_simple_async_result_get_op_res_gssize (simple);
1019 return nread;
1020 }
1021
1022 typedef struct {
1023 gssize bytes_read;
1024 gssize count;
1025 void *buffer;
1026 } ReadAsyncData;
1027
1028 static void
free_read_async_data(gpointer _data)1029 free_read_async_data (gpointer _data)
1030 {
1031 ReadAsyncData *data = _data;
1032 g_slice_free (ReadAsyncData, data);
1033 }
1034
1035 static void
large_read_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1036 large_read_callback (GObject *source_object,
1037 GAsyncResult *result,
1038 gpointer user_data)
1039 {
1040 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1041 ReadAsyncData *data;
1042 GError *error;
1043 gssize nread;
1044
1045 data = g_simple_async_result_get_op_res_gpointer (simple);
1046
1047 error = NULL;
1048 nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1049 result, &error);
1050
1051 /* Only report the error if we've not already read some data */
1052 if (nread < 0 && data->bytes_read == 0)
1053 g_simple_async_result_set_from_error (simple, error);
1054
1055 if (nread > 0)
1056 data->bytes_read += nread;
1057
1058 if (error)
1059 g_error_free (error);
1060
1061 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1062 g_simple_async_result_complete (simple);
1063 g_object_unref (simple);
1064 }
1065
1066 static void
read_fill_buffer_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1067 read_fill_buffer_callback (GObject *source_object,
1068 GAsyncResult *result,
1069 gpointer user_data)
1070 {
1071 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1072 GBufferedInputStream *bstream;
1073 GBufferedInputStreamPrivate *priv;
1074 ReadAsyncData *data;
1075 GError *error;
1076 gssize nread;
1077 gsize available;
1078
1079 bstream = G_BUFFERED_INPUT_STREAM (source_object);
1080 priv = bstream->priv;
1081
1082 data = g_simple_async_result_get_op_res_gpointer (simple);
1083
1084 error = NULL;
1085 nread = g_buffered_input_stream_fill_finish (bstream,
1086 result, &error);
1087
1088 if (nread < 0 && data->bytes_read == 0)
1089 g_simple_async_result_set_from_error (simple, error);
1090
1091
1092 if (nread > 0)
1093 {
1094 available = priv->end - priv->pos;
1095 data->count = MIN (data->count, available);
1096
1097 memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1098 data->bytes_read += data->count;
1099 priv->pos += data->count;
1100 }
1101
1102 if (error)
1103 g_error_free (error);
1104
1105 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1106 g_simple_async_result_complete (simple);
1107 g_object_unref (simple);
1108 }
1109
1110 static void
g_buffered_input_stream_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1111 g_buffered_input_stream_read_async (GInputStream *stream,
1112 void *buffer,
1113 gsize count,
1114 int io_priority,
1115 GCancellable *cancellable,
1116 GAsyncReadyCallback callback,
1117 gpointer user_data)
1118 {
1119 GBufferedInputStream *bstream;
1120 GBufferedInputStreamPrivate *priv;
1121 GBufferedInputStreamClass *class;
1122 GInputStream *base_stream;
1123 gsize available;
1124 GSimpleAsyncResult *simple;
1125 ReadAsyncData *data;
1126
1127 bstream = G_BUFFERED_INPUT_STREAM (stream);
1128 priv = bstream->priv;
1129
1130 data = g_slice_new (ReadAsyncData);
1131 data->buffer = buffer;
1132 data->bytes_read = 0;
1133 simple = g_simple_async_result_new (G_OBJECT (stream),
1134 callback, user_data,
1135 g_buffered_input_stream_read_async);
1136 g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1137
1138 available = priv->end - priv->pos;
1139
1140 if (count <= available)
1141 {
1142 memcpy (buffer, priv->buffer + priv->pos, count);
1143 priv->pos += count;
1144 data->bytes_read = count;
1145
1146 g_simple_async_result_complete_in_idle (simple);
1147 g_object_unref (simple);
1148 return;
1149 }
1150
1151
1152 /* Full request not available, read all currently availbile and request refill for more */
1153
1154 memcpy (buffer, priv->buffer + priv->pos, available);
1155 priv->pos = 0;
1156 priv->end = 0;
1157
1158 count -= available;
1159
1160 data->bytes_read = available;
1161 data->count = count;
1162
1163 if (count > priv->len)
1164 {
1165 /* Large request, shortcut buffer */
1166
1167 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1168
1169 g_input_stream_read_async (base_stream,
1170 (char *)buffer + data->bytes_read,
1171 count,
1172 io_priority, cancellable,
1173 large_read_callback,
1174 simple);
1175 }
1176 else
1177 {
1178 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1179 class->fill_async (bstream, priv->len, io_priority, cancellable,
1180 read_fill_buffer_callback, simple);
1181 }
1182 }
1183
1184 static gssize
g_buffered_input_stream_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1185 g_buffered_input_stream_read_finish (GInputStream *stream,
1186 GAsyncResult *result,
1187 GError **error)
1188 {
1189 GSimpleAsyncResult *simple;
1190 ReadAsyncData *data;
1191
1192 simple = G_SIMPLE_ASYNC_RESULT (result);
1193
1194 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1195
1196 data = g_simple_async_result_get_op_res_gpointer (simple);
1197
1198 return data->bytes_read;
1199 }
1200
1201 typedef struct {
1202 gssize bytes_skipped;
1203 gssize count;
1204 } SkipAsyncData;
1205
1206 static void
free_skip_async_data(gpointer _data)1207 free_skip_async_data (gpointer _data)
1208 {
1209 SkipAsyncData *data = _data;
1210 g_slice_free (SkipAsyncData, data);
1211 }
1212
1213 static void
large_skip_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1214 large_skip_callback (GObject *source_object,
1215 GAsyncResult *result,
1216 gpointer user_data)
1217 {
1218 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1219 SkipAsyncData *data;
1220 GError *error;
1221 gssize nread;
1222
1223 data = g_simple_async_result_get_op_res_gpointer (simple);
1224
1225 error = NULL;
1226 nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1227 result, &error);
1228
1229 /* Only report the error if we've not already read some data */
1230 if (nread < 0 && data->bytes_skipped == 0)
1231 g_simple_async_result_set_from_error (simple, error);
1232
1233 if (nread > 0)
1234 data->bytes_skipped += nread;
1235
1236 if (error)
1237 g_error_free (error);
1238
1239 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1240 g_simple_async_result_complete (simple);
1241 g_object_unref (simple);
1242 }
1243
1244 static void
skip_fill_buffer_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1245 skip_fill_buffer_callback (GObject *source_object,
1246 GAsyncResult *result,
1247 gpointer user_data)
1248 {
1249 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1250 GBufferedInputStream *bstream;
1251 GBufferedInputStreamPrivate *priv;
1252 SkipAsyncData *data;
1253 GError *error;
1254 gssize nread;
1255 gsize available;
1256
1257 bstream = G_BUFFERED_INPUT_STREAM (source_object);
1258 priv = bstream->priv;
1259
1260 data = g_simple_async_result_get_op_res_gpointer (simple);
1261
1262 error = NULL;
1263 nread = g_buffered_input_stream_fill_finish (bstream,
1264 result, &error);
1265
1266 if (nread < 0 && data->bytes_skipped == 0)
1267 g_simple_async_result_set_from_error (simple, error);
1268
1269
1270 if (nread > 0)
1271 {
1272 available = priv->end - priv->pos;
1273 data->count = MIN (data->count, available);
1274
1275 data->bytes_skipped += data->count;
1276 priv->pos += data->count;
1277 }
1278
1279 if (error)
1280 g_error_free (error);
1281
1282 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1283 g_simple_async_result_complete (simple);
1284 g_object_unref (simple);
1285 }
1286
1287 static void
g_buffered_input_stream_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1288 g_buffered_input_stream_skip_async (GInputStream *stream,
1289 gsize count,
1290 int io_priority,
1291 GCancellable *cancellable,
1292 GAsyncReadyCallback callback,
1293 gpointer user_data)
1294 {
1295 GBufferedInputStream *bstream;
1296 GBufferedInputStreamPrivate *priv;
1297 GBufferedInputStreamClass *class;
1298 GInputStream *base_stream;
1299 gsize available;
1300 GSimpleAsyncResult *simple;
1301 SkipAsyncData *data;
1302
1303 bstream = G_BUFFERED_INPUT_STREAM (stream);
1304 priv = bstream->priv;
1305
1306 data = g_slice_new (SkipAsyncData);
1307 data->bytes_skipped = 0;
1308 simple = g_simple_async_result_new (G_OBJECT (stream),
1309 callback, user_data,
1310 g_buffered_input_stream_skip_async);
1311 g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1312
1313 available = priv->end - priv->pos;
1314
1315 if (count <= available)
1316 {
1317 priv->pos += count;
1318 data->bytes_skipped = count;
1319
1320 g_simple_async_result_complete_in_idle (simple);
1321 g_object_unref (simple);
1322 return;
1323 }
1324
1325
1326 /* Full request not available, skip all currently availbile and request refill for more */
1327
1328 priv->pos = 0;
1329 priv->end = 0;
1330
1331 count -= available;
1332
1333 data->bytes_skipped = available;
1334 data->count = count;
1335
1336 if (count > priv->len)
1337 {
1338 /* Large request, shortcut buffer */
1339
1340 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1341
1342 g_input_stream_skip_async (base_stream,
1343 count,
1344 io_priority, cancellable,
1345 large_skip_callback,
1346 simple);
1347 }
1348 else
1349 {
1350 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1351 class->fill_async (bstream, priv->len, io_priority, cancellable,
1352 skip_fill_buffer_callback, simple);
1353 }
1354 }
1355
1356 static gssize
g_buffered_input_stream_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1357 g_buffered_input_stream_skip_finish (GInputStream *stream,
1358 GAsyncResult *result,
1359 GError **error)
1360 {
1361 GSimpleAsyncResult *simple;
1362 SkipAsyncData *data;
1363
1364 simple = G_SIMPLE_ASYNC_RESULT (result);
1365
1366 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1367
1368 data = g_simple_async_result_get_op_res_gpointer (simple);
1369
1370 return data->bytes_skipped;
1371 }
1372
1373
1374 #define __G_BUFFERED_INPUT_STREAM_C__
1375 #include "gioaliasdef.c"
1376