• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23 
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33 
34 #include "gioalias.h"
35 
36 /**
37  * SECTION:gbufferedinputstream
38  * @short_description: Buffered Input Stream
39  * @include: gio/gio.h
40  * @see_also: #GFilterInputStream, #GInputStream
41  *
42  * Buffered input stream implements #GFilterInputStream and provides
43  * for buffered reads.
44  *
45  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
46  *
47  * To create a buffered input stream, use g_buffered_input_stream_new(),
48  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
49  * construction.
50  *
51  * To get the size of a buffer within a buffered input stream, use
52  * g_buffered_input_stream_get_buffer_size(). To change the size of a
53  * buffered input stream's buffer, use
54  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
55  * cannot be reduced below the size of the data within the buffer.
56  *
57  **/
58 
59 
60 
61 #define DEFAULT_BUFFER_SIZE 4096
62 
63 struct _GBufferedInputStreamPrivate {
64   guint8 *buffer;
65   gsize   len;
66   gsize   pos;
67   gsize   end;
68   GAsyncReadyCallback outstanding_callback;
69 };
70 
71 enum {
72   PROP_0,
73   PROP_BUFSIZE
74 };
75 
76 static void g_buffered_input_stream_set_property  (GObject      *object,
77                                                    guint         prop_id,
78                                                    const GValue *value,
79                                                    GParamSpec   *pspec);
80 
81 static void g_buffered_input_stream_get_property  (GObject      *object,
82                                                    guint         prop_id,
83                                                    GValue       *value,
84                                                    GParamSpec   *pspec);
85 static void g_buffered_input_stream_finalize      (GObject *object);
86 
87 
88 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
89 							gsize                  count,
90 							GCancellable          *cancellable,
91 							GError               **error);
92 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
93 							gsize                  count,
94 							int                    io_priority,
95 							GCancellable          *cancellable,
96 							GAsyncReadyCallback    callback,
97 							gpointer               user_data);
98 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
99 							GAsyncResult          *result,
100 							GError               **error);
101 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
102 							void                  *buffer,
103 							gsize                  count,
104 							GCancellable          *cancellable,
105 							GError               **error);
106 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
107 							void                  *buffer,
108 							gsize                  count,
109 							int                    io_priority,
110 							GCancellable          *cancellable,
111 							GAsyncReadyCallback    callback,
112 							gpointer               user_data);
113 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
114 							GAsyncResult          *result,
115 							GError               **error);
116 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
117 							gssize                 count,
118 							GCancellable          *cancellable,
119 							GError               **error);
120 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
121 							gssize                 count,
122 							int                    io_priority,
123 							GCancellable          *cancellable,
124 							GAsyncReadyCallback    callback,
125 							gpointer               user_data);
126 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
127 							GAsyncResult          *result,
128 							GError               **error);
129 
130 static void compact_buffer (GBufferedInputStream *stream);
131 
G_DEFINE_TYPE(GBufferedInputStream,g_buffered_input_stream,G_TYPE_FILTER_INPUT_STREAM)132 G_DEFINE_TYPE (GBufferedInputStream,
133                g_buffered_input_stream,
134                G_TYPE_FILTER_INPUT_STREAM)
135 
136 
137 static void
138 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
139 {
140   GObjectClass *object_class;
141   GInputStreamClass *istream_class;
142   GBufferedInputStreamClass *bstream_class;
143 
144   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
145 
146   object_class = G_OBJECT_CLASS (klass);
147   object_class->get_property = g_buffered_input_stream_get_property;
148   object_class->set_property = g_buffered_input_stream_set_property;
149   object_class->finalize     = g_buffered_input_stream_finalize;
150 
151   istream_class = G_INPUT_STREAM_CLASS (klass);
152   istream_class->skip = g_buffered_input_stream_skip;
153   istream_class->skip_async  = g_buffered_input_stream_skip_async;
154   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
155   istream_class->read_fn = g_buffered_input_stream_read;
156   istream_class->read_async  = g_buffered_input_stream_read_async;
157   istream_class->read_finish = g_buffered_input_stream_read_finish;
158 
159   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
160   bstream_class->fill = g_buffered_input_stream_real_fill;
161   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
162   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
163 
164   g_object_class_install_property (object_class,
165                                    PROP_BUFSIZE,
166                                    g_param_spec_uint ("buffer-size",
167                                                       P_("Buffer Size"),
168                                                       P_("The size of the backend buffer"),
169                                                       1,
170                                                       G_MAXUINT,
171                                                       DEFAULT_BUFFER_SIZE,
172                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
173                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
174 
175 
176 }
177 
178 /**
179  * g_buffered_input_stream_get_buffer_size:
180  * @stream: #GBufferedInputStream.
181  *
182  * Gets the size of the input buffer.
183  *
184  * Returns: the current buffer size.
185  **/
186 gsize
g_buffered_input_stream_get_buffer_size(GBufferedInputStream * stream)187 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
188 {
189   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
190 
191   return stream->priv->len;
192 }
193 
194 /**
195  * g_buffered_input_stream_set_buffer_size:
196  * @stream: #GBufferedInputStream.
197  * @size: a #gsize.
198  *
199  * Sets the size of the internal buffer of @stream to @size, or to the
200  * size of the contents of the buffer. The buffer can never be resized
201  * smaller than its current contents.
202  **/
203 void
g_buffered_input_stream_set_buffer_size(GBufferedInputStream * stream,gsize size)204 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
205                                          gsize                  size)
206 {
207   GBufferedInputStreamPrivate *priv;
208   gsize in_buffer;
209   guint8 *buffer;
210 
211   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
212 
213   priv = stream->priv;
214 
215   if (priv->len == size)
216     return;
217 
218   if (priv->buffer)
219     {
220       in_buffer = priv->end - priv->pos;
221 
222       /* Never resize smaller than current buffer contents */
223       size = MAX (size, in_buffer);
224 
225       buffer = g_malloc (size);
226       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
227       priv->len = size;
228       priv->pos = 0;
229       priv->end = in_buffer;
230       g_free (priv->buffer);
231       priv->buffer = buffer;
232     }
233   else
234     {
235       priv->len = size;
236       priv->pos = 0;
237       priv->end = 0;
238       priv->buffer = g_malloc (size);
239     }
240 
241   g_object_notify (G_OBJECT (stream), "buffer-size");
242 }
243 
244 static void
g_buffered_input_stream_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)245 g_buffered_input_stream_set_property (GObject      *object,
246                                       guint         prop_id,
247                                       const GValue *value,
248                                       GParamSpec   *pspec)
249 {
250   GBufferedInputStreamPrivate *priv;
251   GBufferedInputStream        *bstream;
252 
253   bstream = G_BUFFERED_INPUT_STREAM (object);
254   priv = bstream->priv;
255 
256   switch (prop_id)
257     {
258     case PROP_BUFSIZE:
259       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
260       break;
261 
262     default:
263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
264       break;
265     }
266 
267 }
268 
269 static void
g_buffered_input_stream_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)270 g_buffered_input_stream_get_property (GObject    *object,
271                                       guint       prop_id,
272                                       GValue     *value,
273                                       GParamSpec *pspec)
274 {
275   GBufferedInputStreamPrivate *priv;
276   GBufferedInputStream        *bstream;
277 
278   bstream = G_BUFFERED_INPUT_STREAM (object);
279   priv = bstream->priv;
280 
281   switch (prop_id)
282     {
283     case PROP_BUFSIZE:
284       g_value_set_uint (value, priv->len);
285       break;
286 
287     default:
288       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
289       break;
290     }
291 }
292 
293 static void
g_buffered_input_stream_finalize(GObject * object)294 g_buffered_input_stream_finalize (GObject *object)
295 {
296   GBufferedInputStreamPrivate *priv;
297   GBufferedInputStream        *stream;
298 
299   stream = G_BUFFERED_INPUT_STREAM (object);
300   priv = stream->priv;
301 
302   g_free (priv->buffer);
303 
304   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
305 }
306 
307 static void
g_buffered_input_stream_init(GBufferedInputStream * stream)308 g_buffered_input_stream_init (GBufferedInputStream *stream)
309 {
310   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
311                                               G_TYPE_BUFFERED_INPUT_STREAM,
312                                               GBufferedInputStreamPrivate);
313 }
314 
315 
316 /**
317  * g_buffered_input_stream_new:
318  * @base_stream: a #GInputStream.
319  *
320  * Creates a new #GInputStream from the given @base_stream, with
321  * a buffer set to the default size (4 kilobytes).
322  *
323  * Returns: a #GInputStream for the given @base_stream.
324  **/
325 GInputStream *
g_buffered_input_stream_new(GInputStream * base_stream)326 g_buffered_input_stream_new (GInputStream *base_stream)
327 {
328   GInputStream *stream;
329 
330   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
331 
332   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
333                          "base-stream", base_stream,
334                          NULL);
335 
336   return stream;
337 }
338 
339 /**
340  * g_buffered_input_stream_new_sized:
341  * @base_stream: a #GInputStream.
342  * @size: a #gsize.
343  *
344  * Creates a new #GBufferedInputStream from the given @base_stream,
345  * with a buffer set to @size.
346  *
347  * Returns: a #GInputStream.
348  **/
349 GInputStream *
g_buffered_input_stream_new_sized(GInputStream * base_stream,gsize size)350 g_buffered_input_stream_new_sized (GInputStream *base_stream,
351                                    gsize         size)
352 {
353   GInputStream *stream;
354 
355   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
356 
357   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
358                          "base-stream", base_stream,
359                          "buffer-size", (guint)size,
360                          NULL);
361 
362   return stream;
363 }
364 
365 /**
366  * g_buffered_input_stream_fill:
367  * @stream: #GBufferedInputStream.
368  * @count: the number of bytes that will be read from the stream.
369  * @cancellable: optional #GCancellable object, %NULL to ignore.
370  * @error: location to store the error occuring, or %NULL to ignore.
371  *
372  * Tries to read @count bytes from the stream into the buffer.
373  * Will block during this read.
374  *
375  * If @count is zero, returns zero and does nothing. A value of @count
376  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
377  *
378  * On success, the number of bytes read into the buffer is returned.
379  * It is not an error if this is not the same as the requested size, as it
380  * can happen e.g. near the end of a file. Zero is returned on end of file
381  * (or if @count is zero),  but never otherwise.
382  *
383  * If @count is -1 then the attempted read size is equal to the number of
384  * bytes that are required to fill the buffer.
385  *
386  * If @cancellable is not %NULL, then the operation can be cancelled by
387  * triggering the cancellable object from another thread. If the operation
388  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
389  * operation was partially finished when the operation was cancelled the
390  * partial result will be returned, without an error.
391  *
392  * On error -1 is returned and @error is set accordingly.
393  *
394  * For the asynchronous, non-blocking, version of this function, see
395  * g_buffered_input_stream_fill_async().
396  *
397  * Returns: the number of bytes read into @stream's buffer, up to @count,
398  *     or -1 on error.
399  **/
400 gssize
g_buffered_input_stream_fill(GBufferedInputStream * stream,gssize count,GCancellable * cancellable,GError ** error)401 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
402                               gssize                 count,
403                               GCancellable          *cancellable,
404                               GError               **error)
405 {
406   GBufferedInputStreamClass *class;
407   GInputStream *input_stream;
408   gssize res;
409 
410   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
411 
412   input_stream = G_INPUT_STREAM (stream);
413 
414   if (count < -1)
415     {
416       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
417                    _("Too large count value passed to %s"), G_STRFUNC);
418       return -1;
419     }
420 
421   if (!g_input_stream_set_pending (input_stream, error))
422     return -1;
423 
424   if (cancellable)
425     g_cancellable_push_current (cancellable);
426 
427   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
428   res = class->fill (stream, count, cancellable, error);
429 
430   if (cancellable)
431     g_cancellable_pop_current (cancellable);
432 
433   g_input_stream_clear_pending (input_stream);
434 
435   return res;
436 }
437 
438 static void
async_fill_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)439 async_fill_callback_wrapper (GObject      *source_object,
440                              GAsyncResult *res,
441                              gpointer      user_data)
442 {
443   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
444 
445   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
446   (*stream->priv->outstanding_callback) (source_object, res, user_data);
447   g_object_unref (stream);
448 }
449 
450 /**
451  * g_buffered_input_stream_fill_async:
452  * @stream: #GBufferedInputStream.
453  * @count: the number of bytes that will be read from the stream.
454  * @io_priority: the <link linkend="io-priority">I/O priority</link>
455  *     of the request.
456  * @cancellable: optional #GCancellable object
457  * @callback: a #GAsyncReadyCallback.
458  * @user_data: a #gpointer.
459  *
460  * Reads data into @stream's buffer asynchronously, up to @count size.
461  * @io_priority can be used to prioritize reads. For the synchronous
462  * version of this function, see g_buffered_input_stream_fill().
463  *
464  * If @count is -1 then the attempted read size is equal to the number
465  * of bytes that are required to fill the buffer.
466  **/
467 void
g_buffered_input_stream_fill_async(GBufferedInputStream * stream,gssize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)468 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
469                                     gssize                count,
470                                     int                   io_priority,
471                                     GCancellable         *cancellable,
472                                     GAsyncReadyCallback   callback,
473                                     gpointer              user_data)
474 {
475   GBufferedInputStreamClass *class;
476   GSimpleAsyncResult *simple;
477   GError *error = NULL;
478 
479   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
480 
481   if (count == 0)
482     {
483       simple = g_simple_async_result_new (G_OBJECT (stream),
484 					  callback,
485 					  user_data,
486 					  g_buffered_input_stream_fill_async);
487       g_simple_async_result_complete_in_idle (simple);
488       g_object_unref (simple);
489       return;
490     }
491 
492   if (count < -1)
493     {
494       g_simple_async_report_error_in_idle (G_OBJECT (stream),
495 					   callback,
496 					   user_data,
497 					   G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
498 					   _("Too large count value passed to %s"),
499 					   G_STRFUNC);
500       return;
501     }
502 
503   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
504     {
505       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
506 					    callback,
507 					    user_data,
508 					    error);
509       g_error_free (error);
510       return;
511     }
512 
513   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
514 
515   stream->priv->outstanding_callback = callback;
516   g_object_ref (stream);
517   class->fill_async (stream, count, io_priority, cancellable,
518                      async_fill_callback_wrapper, user_data);
519 }
520 
521 /**
522  * g_buffered_input_stream_fill_finish:
523  * @stream: a #GBufferedInputStream.
524  * @result: a #GAsyncResult.
525  * @error: a #GError.
526  *
527  * Finishes an asynchronous read.
528  *
529  * Returns: a #gssize of the read stream, or %-1 on an error.
530  **/
531 gssize
g_buffered_input_stream_fill_finish(GBufferedInputStream * stream,GAsyncResult * result,GError ** error)532 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
533                                      GAsyncResult          *result,
534                                      GError               **error)
535 {
536   GSimpleAsyncResult *simple;
537   GBufferedInputStreamClass *class;
538 
539   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
540   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
541 
542   if (G_IS_SIMPLE_ASYNC_RESULT (result))
543     {
544       simple = G_SIMPLE_ASYNC_RESULT (result);
545       if (g_simple_async_result_propagate_error (simple, error))
546         return -1;
547 
548       /* Special case read of 0 bytes */
549       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
550         return 0;
551     }
552 
553   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
554   return class->fill_finish (stream, result, error);
555 }
556 
557 /**
558  * g_buffered_input_stream_get_available:
559  * @stream: #GBufferedInputStream.
560  *
561  * Gets the size of the available data within the stream.
562  *
563  * Returns: size of the available stream.
564  **/
565 gsize
g_buffered_input_stream_get_available(GBufferedInputStream * stream)566 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
567 {
568   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
569 
570   return stream->priv->end - stream->priv->pos;
571 }
572 
573 /**
574  * g_buffered_input_stream_peek:
575  * @stream: a #GBufferedInputStream.
576  * @buffer: a pointer to an allocated chunk of memory.
577  * @offset: a #gsize.
578  * @count: a #gsize.
579  *
580  * Peeks in the buffer, copying data of size @count into @buffer,
581  * offset @offset bytes.
582  *
583  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
584  **/
585 gsize
g_buffered_input_stream_peek(GBufferedInputStream * stream,void * buffer,gsize offset,gsize count)586 g_buffered_input_stream_peek (GBufferedInputStream *stream,
587                               void                 *buffer,
588                               gsize                 offset,
589                               gsize                 count)
590 {
591   gsize available;
592   gsize end;
593 
594   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
595   g_return_val_if_fail (buffer != NULL, -1);
596 
597   available = g_buffered_input_stream_get_available (stream);
598 
599   if (offset > available)
600     return 0;
601 
602   end = MIN (offset + count, available);
603   count = end - offset;
604 
605   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
606   return count;
607 }
608 
609 /**
610  * g_buffered_input_stream_peek_buffer:
611  * @stream: a #GBufferedInputStream.
612  * @count: a #gsize to get the number of bytes available in the buffer.
613  *
614  * Returns the buffer with the currently available bytes. The returned
615  * buffer must not be modified and will become invalid when reading from
616  * the stream or filling the buffer.
617  *
618  * Returns: read-only buffer
619  **/
620 const void*
g_buffered_input_stream_peek_buffer(GBufferedInputStream * stream,gsize * count)621 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
622                                      gsize                *count)
623 {
624   GBufferedInputStreamPrivate *priv;
625 
626   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
627 
628   priv = stream->priv;
629 
630   if (count)
631     *count = priv->end - priv->pos;
632 
633   return priv->buffer + priv->pos;
634 }
635 
636 static void
compact_buffer(GBufferedInputStream * stream)637 compact_buffer (GBufferedInputStream *stream)
638 {
639   GBufferedInputStreamPrivate *priv;
640   gsize current_size;
641 
642   priv = stream->priv;
643 
644   current_size = priv->end - priv->pos;
645 
646   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
647 
648   priv->pos = 0;
649   priv->end = current_size;
650 }
651 
652 static gssize
g_buffered_input_stream_real_fill(GBufferedInputStream * stream,gssize count,GCancellable * cancellable,GError ** error)653 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
654                                    gssize                 count,
655                                    GCancellable          *cancellable,
656                                    GError               **error)
657 {
658   GBufferedInputStreamPrivate *priv;
659   GInputStream *base_stream;
660   gssize nread;
661   gsize in_buffer;
662 
663   priv = stream->priv;
664 
665   if (count == -1)
666     count = priv->len;
667 
668   in_buffer = priv->end - priv->pos;
669 
670   /* Never fill more than can fit in the buffer */
671   count = MIN (count, priv->len - in_buffer);
672 
673   /* If requested length does not fit at end, compact */
674   if (priv->len - priv->end < count)
675     compact_buffer (stream);
676 
677   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
678   nread = g_input_stream_read (base_stream,
679                                priv->buffer + priv->end,
680                                count,
681                                cancellable,
682                                error);
683 
684   if (nread > 0)
685     priv->end += nread;
686 
687   return nread;
688 }
689 
690 static gssize
g_buffered_input_stream_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)691 g_buffered_input_stream_skip (GInputStream  *stream,
692                               gsize          count,
693                               GCancellable  *cancellable,
694                               GError       **error)
695 {
696   GBufferedInputStream        *bstream;
697   GBufferedInputStreamPrivate *priv;
698   GBufferedInputStreamClass *class;
699   GInputStream *base_stream;
700   gsize available, bytes_skipped;
701   gssize nread;
702 
703   bstream = G_BUFFERED_INPUT_STREAM (stream);
704   priv = bstream->priv;
705 
706   available = priv->end - priv->pos;
707 
708   if (count <= available)
709     {
710       priv->pos += count;
711       return count;
712     }
713 
714   /* Full request not available, skip all currently available and
715    * request refill for more
716    */
717 
718   priv->pos = 0;
719   priv->end = 0;
720   bytes_skipped = available;
721   count -= available;
722 
723   if (bytes_skipped > 0)
724     error = NULL; /* Ignore further errors if we already read some data */
725 
726   if (count > priv->len)
727     {
728       /* Large request, shortcut buffer */
729 
730       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
731 
732       nread = g_input_stream_skip (base_stream,
733                                    count,
734                                    cancellable,
735                                    error);
736 
737       if (nread < 0 && bytes_skipped == 0)
738         return -1;
739 
740       if (nread > 0)
741         bytes_skipped += nread;
742 
743       return bytes_skipped;
744     }
745 
746   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
747   nread = class->fill (bstream, priv->len, cancellable, error);
748 
749   if (nread < 0)
750     {
751       if (bytes_skipped == 0)
752         return -1;
753       else
754         return bytes_skipped;
755     }
756 
757   available = priv->end - priv->pos;
758   count = MIN (count, available);
759 
760   bytes_skipped += count;
761   priv->pos += count;
762 
763   return bytes_skipped;
764 }
765 
766 static gssize
g_buffered_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)767 g_buffered_input_stream_read (GInputStream *stream,
768                               void         *buffer,
769                               gsize         count,
770                               GCancellable *cancellable,
771                               GError      **error)
772 {
773   GBufferedInputStream        *bstream;
774   GBufferedInputStreamPrivate *priv;
775   GBufferedInputStreamClass *class;
776   GInputStream *base_stream;
777   gsize available, bytes_read;
778   gssize nread;
779 
780   bstream = G_BUFFERED_INPUT_STREAM (stream);
781   priv = bstream->priv;
782 
783   available = priv->end - priv->pos;
784 
785   if (count <= available)
786     {
787       memcpy (buffer, priv->buffer + priv->pos, count);
788       priv->pos += count;
789       return count;
790     }
791 
792   /* Full request not available, read all currently availbile and request refill for more */
793 
794   memcpy (buffer, priv->buffer + priv->pos, available);
795   priv->pos = 0;
796   priv->end = 0;
797   bytes_read = available;
798   count -= available;
799 
800   if (bytes_read > 0)
801     error = NULL; /* Ignore further errors if we already read some data */
802 
803   if (count > priv->len)
804     {
805       /* Large request, shortcut buffer */
806 
807       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
808 
809       nread = g_input_stream_read (base_stream,
810 				   (char *)buffer + bytes_read,
811 				   count,
812 				   cancellable,
813 				   error);
814 
815       if (nread < 0 && bytes_read == 0)
816         return -1;
817 
818       if (nread > 0)
819         bytes_read += nread;
820 
821       return bytes_read;
822     }
823 
824   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
825   nread = class->fill (bstream, priv->len, cancellable, error);
826   if (nread < 0)
827     {
828       if (bytes_read == 0)
829         return -1;
830       else
831         return bytes_read;
832     }
833 
834   available = priv->end - priv->pos;
835   count = MIN (count, available);
836 
837   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
838   bytes_read += count;
839   priv->pos += count;
840 
841   return bytes_read;
842 }
843 
844 /**
845  * g_buffered_input_stream_read_byte:
846  * @stream: #GBufferedInputStream.
847  * @cancellable: optional #GCancellable object, %NULL to ignore.
848  * @error: location to store the error occuring, or %NULL to ignore.
849  *
850  * Tries to read a single byte from the stream or the buffer. Will block
851  * during this read.
852  *
853  * On success, the byte read from the stream is returned. On end of stream
854  * -1 is returned but it's not an exceptional error and @error is not set.
855  *
856  * If @cancellable is not %NULL, then the operation can be cancelled by
857  * triggering the cancellable object from another thread. If the operation
858  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
859  * operation was partially finished when the operation was cancelled the
860  * partial result will be returned, without an error.
861  *
862  * On error -1 is returned and @error is set accordingly.
863  *
864  * Returns: the byte read from the @stream, or -1 on end of stream or error.
865  **/
866 int
g_buffered_input_stream_read_byte(GBufferedInputStream * stream,GCancellable * cancellable,GError ** error)867 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
868                                    GCancellable          *cancellable,
869                                    GError               **error)
870 {
871   GBufferedInputStreamPrivate *priv;
872   GBufferedInputStreamClass *class;
873   GInputStream *input_stream;
874   gsize available;
875   gssize nread;
876 
877   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
878 
879   priv = stream->priv;
880   input_stream = G_INPUT_STREAM (stream);
881 
882   if (g_input_stream_is_closed (input_stream))
883     {
884       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
885                            _("Stream is already closed"));
886       return -1;
887     }
888 
889   if (!g_input_stream_set_pending (input_stream, error))
890     return -1;
891 
892   available = priv->end - priv->pos;
893 
894   if (available != 0)
895     {
896       g_input_stream_clear_pending (input_stream);
897       return priv->buffer[priv->pos++];
898     }
899 
900   /* Byte not available, request refill for more */
901 
902   if (cancellable)
903     g_cancellable_push_current (cancellable);
904 
905   priv->pos = 0;
906   priv->end = 0;
907 
908   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
909   nread = class->fill (stream, priv->len, cancellable, error);
910 
911   if (cancellable)
912     g_cancellable_pop_current (cancellable);
913 
914   g_input_stream_clear_pending (input_stream);
915 
916   if (nread <= 0)
917     return -1; /* error or end of stream */
918 
919   return priv->buffer[priv->pos++];
920 }
921 
922 /* ************************** */
923 /* Async stuff implementation */
924 /* ************************** */
925 
926 static void
fill_async_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)927 fill_async_callback (GObject      *source_object,
928                      GAsyncResult *result,
929                      gpointer      user_data)
930 {
931   GError *error;
932   gssize res;
933   GSimpleAsyncResult *simple;
934 
935   simple = user_data;
936 
937   error = NULL;
938   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
939 				    result, &error);
940 
941   g_simple_async_result_set_op_res_gssize (simple, res);
942   if (res == -1)
943     {
944       g_simple_async_result_set_from_error (simple, error);
945       g_error_free (error);
946     }
947   else
948     {
949       GBufferedInputStreamPrivate *priv;
950       GObject *object;
951 
952       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
953       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
954 
955       g_assert_cmpint (priv->end + res, <=, priv->len);
956       priv->end += res;
957 
958       g_object_unref (object);
959     }
960 
961   /* Complete immediately, not in idle, since we're already in a mainloop callout */
962   g_simple_async_result_complete (simple);
963   g_object_unref (simple);
964 }
965 
966 static void
g_buffered_input_stream_real_fill_async(GBufferedInputStream * stream,gssize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)967 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
968                                          gssize                count,
969                                          int                   io_priority,
970                                          GCancellable         *cancellable,
971                                          GAsyncReadyCallback   callback,
972                                          gpointer              user_data)
973 {
974   GBufferedInputStreamPrivate *priv;
975   GInputStream *base_stream;
976   GSimpleAsyncResult *simple;
977   gsize in_buffer;
978 
979   priv = stream->priv;
980 
981   if (count == -1)
982     count = priv->len;
983 
984   in_buffer = priv->end - priv->pos;
985 
986   /* Never fill more than can fit in the buffer */
987   count = MIN (count, priv->len - in_buffer);
988 
989   /* If requested length does not fit at end, compact */
990   if (priv->len - priv->end < count)
991     compact_buffer (stream);
992 
993   simple = g_simple_async_result_new (G_OBJECT (stream),
994 				      callback, user_data,
995 				      g_buffered_input_stream_real_fill_async);
996 
997   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
998   g_input_stream_read_async (base_stream,
999 			     priv->buffer + priv->end,
1000 			     count,
1001 			     io_priority,
1002 			     cancellable,
1003 			     fill_async_callback,
1004 			     simple);
1005 }
1006 
1007 static gssize
g_buffered_input_stream_real_fill_finish(GBufferedInputStream * stream,GAsyncResult * result,GError ** error)1008 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1009 					  GAsyncResult         *result,
1010 					  GError              **error)
1011 {
1012   GSimpleAsyncResult *simple;
1013   gssize nread;
1014 
1015   simple = G_SIMPLE_ASYNC_RESULT (result);
1016   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1017 
1018   nread = g_simple_async_result_get_op_res_gssize (simple);
1019   return nread;
1020 }
1021 
1022 typedef struct {
1023   gssize bytes_read;
1024   gssize count;
1025   void *buffer;
1026 } ReadAsyncData;
1027 
1028 static void
free_read_async_data(gpointer _data)1029 free_read_async_data (gpointer _data)
1030 {
1031   ReadAsyncData *data = _data;
1032   g_slice_free (ReadAsyncData, data);
1033 }
1034 
1035 static void
large_read_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1036 large_read_callback (GObject *source_object,
1037 		     GAsyncResult *result,
1038 		     gpointer user_data)
1039 {
1040   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1041   ReadAsyncData *data;
1042   GError *error;
1043   gssize nread;
1044 
1045   data = g_simple_async_result_get_op_res_gpointer (simple);
1046 
1047   error = NULL;
1048   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1049 				      result, &error);
1050 
1051   /* Only report the error if we've not already read some data */
1052   if (nread < 0 && data->bytes_read == 0)
1053     g_simple_async_result_set_from_error (simple, error);
1054 
1055   if (nread > 0)
1056     data->bytes_read += nread;
1057 
1058   if (error)
1059     g_error_free (error);
1060 
1061   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1062   g_simple_async_result_complete (simple);
1063   g_object_unref (simple);
1064 }
1065 
1066 static void
read_fill_buffer_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1067 read_fill_buffer_callback (GObject *source_object,
1068 			   GAsyncResult *result,
1069 			   gpointer user_data)
1070 {
1071   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1072   GBufferedInputStream *bstream;
1073   GBufferedInputStreamPrivate *priv;
1074   ReadAsyncData *data;
1075   GError *error;
1076   gssize nread;
1077   gsize available;
1078 
1079   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1080   priv = bstream->priv;
1081 
1082   data = g_simple_async_result_get_op_res_gpointer (simple);
1083 
1084   error = NULL;
1085   nread = g_buffered_input_stream_fill_finish (bstream,
1086 					       result, &error);
1087 
1088   if (nread < 0 && data->bytes_read == 0)
1089     g_simple_async_result_set_from_error (simple, error);
1090 
1091 
1092   if (nread > 0)
1093     {
1094       available = priv->end - priv->pos;
1095       data->count = MIN (data->count, available);
1096 
1097       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1098       data->bytes_read += data->count;
1099       priv->pos += data->count;
1100     }
1101 
1102   if (error)
1103     g_error_free (error);
1104 
1105   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1106   g_simple_async_result_complete (simple);
1107   g_object_unref (simple);
1108 }
1109 
1110 static void
g_buffered_input_stream_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1111 g_buffered_input_stream_read_async (GInputStream              *stream,
1112                                     void                      *buffer,
1113                                     gsize                      count,
1114                                     int                        io_priority,
1115                                     GCancellable              *cancellable,
1116                                     GAsyncReadyCallback        callback,
1117                                     gpointer                   user_data)
1118 {
1119   GBufferedInputStream *bstream;
1120   GBufferedInputStreamPrivate *priv;
1121   GBufferedInputStreamClass *class;
1122   GInputStream *base_stream;
1123   gsize available;
1124   GSimpleAsyncResult *simple;
1125   ReadAsyncData *data;
1126 
1127   bstream = G_BUFFERED_INPUT_STREAM (stream);
1128   priv = bstream->priv;
1129 
1130   data = g_slice_new (ReadAsyncData);
1131   data->buffer = buffer;
1132   data->bytes_read = 0;
1133   simple = g_simple_async_result_new (G_OBJECT (stream),
1134 				      callback, user_data,
1135 				      g_buffered_input_stream_read_async);
1136   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1137 
1138   available = priv->end - priv->pos;
1139 
1140   if (count <= available)
1141     {
1142       memcpy (buffer, priv->buffer + priv->pos, count);
1143       priv->pos += count;
1144       data->bytes_read = count;
1145 
1146       g_simple_async_result_complete_in_idle (simple);
1147       g_object_unref (simple);
1148       return;
1149     }
1150 
1151 
1152   /* Full request not available, read all currently availbile and request refill for more */
1153 
1154   memcpy (buffer, priv->buffer + priv->pos, available);
1155   priv->pos = 0;
1156   priv->end = 0;
1157 
1158   count -= available;
1159 
1160   data->bytes_read = available;
1161   data->count = count;
1162 
1163   if (count > priv->len)
1164     {
1165       /* Large request, shortcut buffer */
1166 
1167       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1168 
1169       g_input_stream_read_async (base_stream,
1170 				 (char *)buffer + data->bytes_read,
1171 				 count,
1172 				 io_priority, cancellable,
1173 				 large_read_callback,
1174 				 simple);
1175     }
1176   else
1177     {
1178       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1179       class->fill_async (bstream, priv->len, io_priority, cancellable,
1180 			 read_fill_buffer_callback, simple);
1181     }
1182 }
1183 
1184 static gssize
g_buffered_input_stream_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1185 g_buffered_input_stream_read_finish (GInputStream   *stream,
1186                                      GAsyncResult   *result,
1187                                      GError        **error)
1188 {
1189   GSimpleAsyncResult *simple;
1190   ReadAsyncData *data;
1191 
1192   simple = G_SIMPLE_ASYNC_RESULT (result);
1193 
1194   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1195 
1196   data = g_simple_async_result_get_op_res_gpointer (simple);
1197 
1198   return data->bytes_read;
1199 }
1200 
1201 typedef struct {
1202   gssize bytes_skipped;
1203   gssize count;
1204 } SkipAsyncData;
1205 
1206 static void
free_skip_async_data(gpointer _data)1207 free_skip_async_data (gpointer _data)
1208 {
1209   SkipAsyncData *data = _data;
1210   g_slice_free (SkipAsyncData, data);
1211 }
1212 
1213 static void
large_skip_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1214 large_skip_callback (GObject *source_object,
1215 		     GAsyncResult *result,
1216 		     gpointer user_data)
1217 {
1218   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1219   SkipAsyncData *data;
1220   GError *error;
1221   gssize nread;
1222 
1223   data = g_simple_async_result_get_op_res_gpointer (simple);
1224 
1225   error = NULL;
1226   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1227 				      result, &error);
1228 
1229   /* Only report the error if we've not already read some data */
1230   if (nread < 0 && data->bytes_skipped == 0)
1231     g_simple_async_result_set_from_error (simple, error);
1232 
1233   if (nread > 0)
1234     data->bytes_skipped += nread;
1235 
1236   if (error)
1237     g_error_free (error);
1238 
1239   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1240   g_simple_async_result_complete (simple);
1241   g_object_unref (simple);
1242 }
1243 
1244 static void
skip_fill_buffer_callback(GObject * source_object,GAsyncResult * result,gpointer user_data)1245 skip_fill_buffer_callback (GObject *source_object,
1246 			   GAsyncResult *result,
1247 			   gpointer user_data)
1248 {
1249   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1250   GBufferedInputStream *bstream;
1251   GBufferedInputStreamPrivate *priv;
1252   SkipAsyncData *data;
1253   GError *error;
1254   gssize nread;
1255   gsize available;
1256 
1257   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1258   priv = bstream->priv;
1259 
1260   data = g_simple_async_result_get_op_res_gpointer (simple);
1261 
1262   error = NULL;
1263   nread = g_buffered_input_stream_fill_finish (bstream,
1264 					       result, &error);
1265 
1266   if (nread < 0 && data->bytes_skipped == 0)
1267     g_simple_async_result_set_from_error (simple, error);
1268 
1269 
1270   if (nread > 0)
1271     {
1272       available = priv->end - priv->pos;
1273       data->count = MIN (data->count, available);
1274 
1275       data->bytes_skipped += data->count;
1276       priv->pos += data->count;
1277     }
1278 
1279   if (error)
1280     g_error_free (error);
1281 
1282   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1283   g_simple_async_result_complete (simple);
1284   g_object_unref (simple);
1285 }
1286 
1287 static void
g_buffered_input_stream_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1288 g_buffered_input_stream_skip_async (GInputStream              *stream,
1289                                     gsize                      count,
1290                                     int                        io_priority,
1291                                     GCancellable              *cancellable,
1292                                     GAsyncReadyCallback        callback,
1293                                     gpointer                   user_data)
1294 {
1295   GBufferedInputStream *bstream;
1296   GBufferedInputStreamPrivate *priv;
1297   GBufferedInputStreamClass *class;
1298   GInputStream *base_stream;
1299   gsize available;
1300   GSimpleAsyncResult *simple;
1301   SkipAsyncData *data;
1302 
1303   bstream = G_BUFFERED_INPUT_STREAM (stream);
1304   priv = bstream->priv;
1305 
1306   data = g_slice_new (SkipAsyncData);
1307   data->bytes_skipped = 0;
1308   simple = g_simple_async_result_new (G_OBJECT (stream),
1309 				      callback, user_data,
1310 				      g_buffered_input_stream_skip_async);
1311   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1312 
1313   available = priv->end - priv->pos;
1314 
1315   if (count <= available)
1316     {
1317       priv->pos += count;
1318       data->bytes_skipped = count;
1319 
1320       g_simple_async_result_complete_in_idle (simple);
1321       g_object_unref (simple);
1322       return;
1323     }
1324 
1325 
1326   /* Full request not available, skip all currently availbile and request refill for more */
1327 
1328   priv->pos = 0;
1329   priv->end = 0;
1330 
1331   count -= available;
1332 
1333   data->bytes_skipped = available;
1334   data->count = count;
1335 
1336   if (count > priv->len)
1337     {
1338       /* Large request, shortcut buffer */
1339 
1340       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1341 
1342       g_input_stream_skip_async (base_stream,
1343 				 count,
1344 				 io_priority, cancellable,
1345 				 large_skip_callback,
1346 				 simple);
1347     }
1348   else
1349     {
1350       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1351       class->fill_async (bstream, priv->len, io_priority, cancellable,
1352 			 skip_fill_buffer_callback, simple);
1353     }
1354 }
1355 
1356 static gssize
g_buffered_input_stream_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1357 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1358                                      GAsyncResult   *result,
1359                                      GError        **error)
1360 {
1361   GSimpleAsyncResult *simple;
1362   SkipAsyncData *data;
1363 
1364   simple = G_SIMPLE_ASYNC_RESULT (result);
1365 
1366   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1367 
1368   data = g_simple_async_result_get_op_res_gpointer (simple);
1369 
1370   return data->bytes_skipped;
1371 }
1372 
1373 
1374 #define __G_BUFFERED_INPUT_STREAM_C__
1375 #include "gioaliasdef.c"
1376