• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Christian Kellner <gicmo@gnome.org>
19  */
20 
21 #include "config.h"
22 #include "gbufferedoutputstream.h"
23 #include "goutputstream.h"
24 #include "gseekable.h"
25 #include "gtask.h"
26 #include "string.h"
27 #include "gioerror.h"
28 #include "glibintl.h"
29 
30 /**
31  * SECTION:gbufferedoutputstream
32  * @short_description: Buffered Output Stream
33  * @include: gio/gio.h
34  * @see_also: #GFilterOutputStream, #GOutputStream
35  *
36  * Buffered output stream implements #GFilterOutputStream and provides
37  * for buffered writes.
38  *
39  * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
40  *
41  * To create a buffered output stream, use g_buffered_output_stream_new(),
42  * or g_buffered_output_stream_new_sized() to specify the buffer's size
43  * at construction.
44  *
45  * To get the size of a buffer within a buffered input stream, use
46  * g_buffered_output_stream_get_buffer_size(). To change the size of a
47  * buffered output stream's buffer, use
48  * g_buffered_output_stream_set_buffer_size(). Note that the buffer's
49  * size cannot be reduced below the size of the data within the buffer.
50  **/
51 
52 #define DEFAULT_BUFFER_SIZE 4096
53 
54 struct _GBufferedOutputStreamPrivate {
55   guint8 *buffer;
56   gsize   len;
57   goffset pos;
58   gboolean auto_grow;
59 };
60 
61 enum {
62   PROP_0,
63   PROP_BUFSIZE,
64   PROP_AUTO_GROW
65 };
66 
67 static void     g_buffered_output_stream_set_property (GObject      *object,
68                                                        guint         prop_id,
69                                                        const GValue *value,
70                                                        GParamSpec   *pspec);
71 
72 static void     g_buffered_output_stream_get_property (GObject    *object,
73                                                        guint       prop_id,
74                                                        GValue     *value,
75                                                        GParamSpec *pspec);
76 static void     g_buffered_output_stream_finalize     (GObject *object);
77 
78 
79 static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
80                                                        const void    *buffer,
81                                                        gsize          count,
82                                                        GCancellable  *cancellable,
83                                                        GError       **error);
84 static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
85                                                        GCancellable  *cancellable,
86                                                        GError          **error);
87 static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
88                                                        GCancellable   *cancellable,
89                                                        GError        **error);
90 
91 static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
92                                                        int                   io_priority,
93                                                        GCancellable         *cancellable,
94                                                        GAsyncReadyCallback   callback,
95                                                        gpointer              data);
96 static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
97                                                        GAsyncResult         *result,
98                                                        GError              **error);
99 static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
100                                                        int                   io_priority,
101                                                        GCancellable         *cancellable,
102                                                        GAsyncReadyCallback   callback,
103                                                        gpointer              data);
104 static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
105                                                        GAsyncResult         *result,
106                                                        GError              **error);
107 
108 static void     g_buffered_output_stream_seekable_iface_init (GSeekableIface  *iface);
109 static goffset  g_buffered_output_stream_tell                (GSeekable       *seekable);
110 static gboolean g_buffered_output_stream_can_seek            (GSeekable       *seekable);
111 static gboolean g_buffered_output_stream_seek                (GSeekable       *seekable,
112 							      goffset          offset,
113 							      GSeekType        type,
114 							      GCancellable    *cancellable,
115 							      GError         **error);
116 static gboolean g_buffered_output_stream_can_truncate        (GSeekable       *seekable);
117 static gboolean g_buffered_output_stream_truncate            (GSeekable       *seekable,
118 							      goffset          offset,
119 							      GCancellable    *cancellable,
120 							      GError         **error);
121 
G_DEFINE_TYPE_WITH_CODE(GBufferedOutputStream,g_buffered_output_stream,G_TYPE_FILTER_OUTPUT_STREAM,G_ADD_PRIVATE (GBufferedOutputStream)G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,g_buffered_output_stream_seekable_iface_init))122 G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream,
123 			 g_buffered_output_stream,
124 			 G_TYPE_FILTER_OUTPUT_STREAM,
125                          G_ADD_PRIVATE (GBufferedOutputStream)
126 			 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
127 						g_buffered_output_stream_seekable_iface_init))
128 
129 
130 static void
131 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
132 {
133   GObjectClass *object_class;
134   GOutputStreamClass *ostream_class;
135 
136   object_class = G_OBJECT_CLASS (klass);
137   object_class->get_property = g_buffered_output_stream_get_property;
138   object_class->set_property = g_buffered_output_stream_set_property;
139   object_class->finalize     = g_buffered_output_stream_finalize;
140 
141   ostream_class = G_OUTPUT_STREAM_CLASS (klass);
142   ostream_class->write_fn = g_buffered_output_stream_write;
143   ostream_class->flush = g_buffered_output_stream_flush;
144   ostream_class->close_fn = g_buffered_output_stream_close;
145   ostream_class->flush_async  = g_buffered_output_stream_flush_async;
146   ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
147   ostream_class->close_async  = g_buffered_output_stream_close_async;
148   ostream_class->close_finish = g_buffered_output_stream_close_finish;
149 
150   g_object_class_install_property (object_class,
151                                    PROP_BUFSIZE,
152                                    g_param_spec_uint ("buffer-size",
153                                                       P_("Buffer Size"),
154                                                       P_("The size of the backend buffer"),
155                                                       1,
156                                                       G_MAXUINT,
157                                                       DEFAULT_BUFFER_SIZE,
158                                                       G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
159                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
160 
161   g_object_class_install_property (object_class,
162                                    PROP_AUTO_GROW,
163                                    g_param_spec_boolean ("auto-grow",
164                                                          P_("Auto-grow"),
165                                                          P_("Whether the buffer should automatically grow"),
166                                                          FALSE,
167                                                          G_PARAM_READWRITE|
168                                                          G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
169 
170 }
171 
172 /**
173  * g_buffered_output_stream_get_buffer_size:
174  * @stream: a #GBufferedOutputStream.
175  *
176  * Gets the size of the buffer in the @stream.
177  *
178  * Returns: the current size of the buffer.
179  **/
180 gsize
g_buffered_output_stream_get_buffer_size(GBufferedOutputStream * stream)181 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
182 {
183   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
184 
185   return stream->priv->len;
186 }
187 
188 /**
189  * g_buffered_output_stream_set_buffer_size:
190  * @stream: a #GBufferedOutputStream.
191  * @size: a #gsize.
192  *
193  * Sets the size of the internal buffer to @size.
194  **/
195 void
g_buffered_output_stream_set_buffer_size(GBufferedOutputStream * stream,gsize size)196 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
197                                           gsize                  size)
198 {
199   GBufferedOutputStreamPrivate *priv;
200   guint8 *buffer;
201 
202   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
203 
204   priv = stream->priv;
205 
206   if (size == priv->len)
207     return;
208 
209   if (priv->buffer)
210     {
211       size = (priv->pos > 0) ? MAX (size, (gsize) priv->pos) : size;
212 
213       buffer = g_malloc (size);
214       memcpy (buffer, priv->buffer, priv->pos);
215       g_free (priv->buffer);
216       priv->buffer = buffer;
217       priv->len = size;
218       /* Keep old pos */
219     }
220   else
221     {
222       priv->buffer = g_malloc (size);
223       priv->len = size;
224       priv->pos = 0;
225     }
226 
227   g_object_notify (G_OBJECT (stream), "buffer-size");
228 }
229 
230 /**
231  * g_buffered_output_stream_get_auto_grow:
232  * @stream: a #GBufferedOutputStream.
233  *
234  * Checks if the buffer automatically grows as data is added.
235  *
236  * Returns: %TRUE if the @stream's buffer automatically grows,
237  * %FALSE otherwise.
238  **/
239 gboolean
g_buffered_output_stream_get_auto_grow(GBufferedOutputStream * stream)240 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
241 {
242   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
243 
244   return stream->priv->auto_grow;
245 }
246 
247 /**
248  * g_buffered_output_stream_set_auto_grow:
249  * @stream: a #GBufferedOutputStream.
250  * @auto_grow: a #gboolean.
251  *
252  * Sets whether or not the @stream's buffer should automatically grow.
253  * If @auto_grow is true, then each write will just make the buffer
254  * larger, and you must manually flush the buffer to actually write out
255  * the data to the underlying stream.
256  **/
257 void
g_buffered_output_stream_set_auto_grow(GBufferedOutputStream * stream,gboolean auto_grow)258 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
259                                         gboolean               auto_grow)
260 {
261   GBufferedOutputStreamPrivate *priv;
262   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
263   priv = stream->priv;
264   auto_grow = auto_grow != FALSE;
265   if (priv->auto_grow != auto_grow)
266     {
267       priv->auto_grow = auto_grow;
268       g_object_notify (G_OBJECT (stream), "auto-grow");
269     }
270 }
271 
272 static void
g_buffered_output_stream_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)273 g_buffered_output_stream_set_property (GObject      *object,
274                                        guint         prop_id,
275                                        const GValue *value,
276                                        GParamSpec   *pspec)
277 {
278   GBufferedOutputStream *stream;
279 
280   stream = G_BUFFERED_OUTPUT_STREAM (object);
281 
282   switch (prop_id)
283     {
284     case PROP_BUFSIZE:
285       g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
286       break;
287 
288     case PROP_AUTO_GROW:
289       g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
290       break;
291 
292     default:
293       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
294       break;
295     }
296 
297 }
298 
299 static void
g_buffered_output_stream_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)300 g_buffered_output_stream_get_property (GObject    *object,
301                                        guint       prop_id,
302                                        GValue     *value,
303                                        GParamSpec *pspec)
304 {
305   GBufferedOutputStream *buffered_stream;
306   GBufferedOutputStreamPrivate *priv;
307 
308   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
309   priv = buffered_stream->priv;
310 
311   switch (prop_id)
312     {
313     case PROP_BUFSIZE:
314       g_value_set_uint (value, priv->len);
315       break;
316 
317     case PROP_AUTO_GROW:
318       g_value_set_boolean (value, priv->auto_grow);
319       break;
320 
321     default:
322       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
323       break;
324     }
325 
326 }
327 
328 static void
g_buffered_output_stream_finalize(GObject * object)329 g_buffered_output_stream_finalize (GObject *object)
330 {
331   GBufferedOutputStream *stream;
332   GBufferedOutputStreamPrivate *priv;
333 
334   stream = G_BUFFERED_OUTPUT_STREAM (object);
335   priv = stream->priv;
336 
337   g_free (priv->buffer);
338 
339   G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
340 }
341 
342 static void
g_buffered_output_stream_init(GBufferedOutputStream * stream)343 g_buffered_output_stream_init (GBufferedOutputStream *stream)
344 {
345   stream->priv = g_buffered_output_stream_get_instance_private (stream);
346 }
347 
348 static void
g_buffered_output_stream_seekable_iface_init(GSeekableIface * iface)349 g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface)
350 {
351   iface->tell         = g_buffered_output_stream_tell;
352   iface->can_seek     = g_buffered_output_stream_can_seek;
353   iface->seek         = g_buffered_output_stream_seek;
354   iface->can_truncate = g_buffered_output_stream_can_truncate;
355   iface->truncate_fn  = g_buffered_output_stream_truncate;
356 }
357 
358 /**
359  * g_buffered_output_stream_new:
360  * @base_stream: a #GOutputStream.
361  *
362  * Creates a new buffered output stream for a base stream.
363  *
364  * Returns: a #GOutputStream for the given @base_stream.
365  **/
366 GOutputStream *
g_buffered_output_stream_new(GOutputStream * base_stream)367 g_buffered_output_stream_new (GOutputStream *base_stream)
368 {
369   GOutputStream *stream;
370 
371   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
372 
373   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
374                          "base-stream", base_stream,
375                          NULL);
376 
377   return stream;
378 }
379 
380 /**
381  * g_buffered_output_stream_new_sized:
382  * @base_stream: a #GOutputStream.
383  * @size: a #gsize.
384  *
385  * Creates a new buffered output stream with a given buffer size.
386  *
387  * Returns: a #GOutputStream with an internal buffer set to @size.
388  **/
389 GOutputStream *
g_buffered_output_stream_new_sized(GOutputStream * base_stream,gsize size)390 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
391                                     gsize          size)
392 {
393   GOutputStream *stream;
394 
395   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
396 
397   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
398                          "base-stream", base_stream,
399                          "buffer-size", size,
400                          NULL);
401 
402   return stream;
403 }
404 
405 static gboolean
flush_buffer(GBufferedOutputStream * stream,GCancellable * cancellable,GError ** error)406 flush_buffer (GBufferedOutputStream  *stream,
407               GCancellable           *cancellable,
408               GError                 **error)
409 {
410   GBufferedOutputStreamPrivate *priv;
411   GOutputStream                *base_stream;
412   gboolean                      res;
413   gsize                         bytes_written;
414   gsize                         count;
415 
416   priv = stream->priv;
417   bytes_written = 0;
418   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
419 
420   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
421 
422   res = g_output_stream_write_all (base_stream,
423                                    priv->buffer,
424                                    priv->pos,
425                                    &bytes_written,
426                                    cancellable,
427                                    error);
428 
429   count = priv->pos - bytes_written;
430 
431   if (count > 0)
432     memmove (priv->buffer, priv->buffer + bytes_written, count);
433 
434   priv->pos -= bytes_written;
435 
436   return res;
437 }
438 
439 static gssize
g_buffered_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)440 g_buffered_output_stream_write  (GOutputStream *stream,
441                                  const void    *buffer,
442                                  gsize          count,
443                                  GCancellable  *cancellable,
444                                  GError       **error)
445 {
446   GBufferedOutputStream        *bstream;
447   GBufferedOutputStreamPrivate *priv;
448   gboolean res;
449   gsize    n;
450   gsize new_size;
451 
452   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
453   priv = bstream->priv;
454 
455   n = priv->len - priv->pos;
456 
457   if (priv->auto_grow && n < count)
458     {
459       new_size = MAX (priv->len * 2, priv->len + count);
460       g_buffered_output_stream_set_buffer_size (bstream, new_size);
461     }
462   else if (n == 0)
463     {
464       res = flush_buffer (bstream, cancellable, error);
465 
466       if (res == FALSE)
467 	return -1;
468     }
469 
470   n = priv->len - priv->pos;
471 
472   count = MIN (count, n);
473   memcpy (priv->buffer + priv->pos, buffer, count);
474   priv->pos += count;
475 
476   return count;
477 }
478 
479 static gboolean
g_buffered_output_stream_flush(GOutputStream * stream,GCancellable * cancellable,GError ** error)480 g_buffered_output_stream_flush (GOutputStream  *stream,
481                                 GCancellable   *cancellable,
482                                 GError        **error)
483 {
484   GBufferedOutputStream *bstream;
485   GOutputStream                *base_stream;
486   gboolean res;
487 
488   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
489   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
490 
491   res = flush_buffer (bstream, cancellable, error);
492 
493   if (res == FALSE)
494     return FALSE;
495 
496   res = g_output_stream_flush (base_stream, cancellable, error);
497 
498   return res;
499 }
500 
501 static gboolean
g_buffered_output_stream_close(GOutputStream * stream,GCancellable * cancellable,GError ** error)502 g_buffered_output_stream_close (GOutputStream  *stream,
503                                 GCancellable   *cancellable,
504                                 GError        **error)
505 {
506   GBufferedOutputStream        *bstream;
507   GOutputStream                *base_stream;
508   gboolean                      res;
509 
510   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
511   base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
512   res = flush_buffer (bstream, cancellable, error);
513 
514   if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
515     {
516       /* report the first error but still close the stream */
517       if (res)
518         res = g_output_stream_close (base_stream, cancellable, error);
519       else
520         g_output_stream_close (base_stream, cancellable, NULL);
521     }
522 
523   return res;
524 }
525 
526 static goffset
g_buffered_output_stream_tell(GSeekable * seekable)527 g_buffered_output_stream_tell (GSeekable *seekable)
528 {
529   GBufferedOutputStream        *bstream;
530   GBufferedOutputStreamPrivate *priv;
531   GOutputStream *base_stream;
532   GSeekable    *base_stream_seekable;
533   goffset base_offset;
534 
535   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
536   priv = bstream->priv;
537 
538   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
539   if (!G_IS_SEEKABLE (base_stream))
540     return 0;
541 
542   base_stream_seekable = G_SEEKABLE (base_stream);
543 
544   base_offset = g_seekable_tell (base_stream_seekable);
545   return base_offset + priv->pos;
546 }
547 
548 static gboolean
g_buffered_output_stream_can_seek(GSeekable * seekable)549 g_buffered_output_stream_can_seek (GSeekable *seekable)
550 {
551   GOutputStream *base_stream;
552 
553   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
554   return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
555 }
556 
557 static gboolean
g_buffered_output_stream_seek(GSeekable * seekable,goffset offset,GSeekType type,GCancellable * cancellable,GError ** error)558 g_buffered_output_stream_seek (GSeekable     *seekable,
559 			       goffset        offset,
560 			       GSeekType      type,
561 			       GCancellable  *cancellable,
562 			       GError       **error)
563 {
564   GBufferedOutputStream *bstream;
565   GOutputStream *base_stream;
566   GSeekable *base_stream_seekable;
567   gboolean flushed;
568 
569   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
570 
571   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
572   if (!G_IS_SEEKABLE (base_stream))
573     {
574       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
575                            _("Seek not supported on base stream"));
576       return FALSE;
577     }
578 
579   base_stream_seekable = G_SEEKABLE (base_stream);
580   flushed = flush_buffer (bstream, cancellable, error);
581   if (!flushed)
582     return FALSE;
583 
584   return g_seekable_seek (base_stream_seekable, offset, type, cancellable, error);
585 }
586 
587 static gboolean
g_buffered_output_stream_can_truncate(GSeekable * seekable)588 g_buffered_output_stream_can_truncate (GSeekable *seekable)
589 {
590   GOutputStream *base_stream;
591 
592   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
593   return G_IS_SEEKABLE (base_stream) && g_seekable_can_truncate (G_SEEKABLE (base_stream));
594 }
595 
596 static gboolean
g_buffered_output_stream_truncate(GSeekable * seekable,goffset offset,GCancellable * cancellable,GError ** error)597 g_buffered_output_stream_truncate (GSeekable     *seekable,
598 				   goffset        offset,
599 				   GCancellable  *cancellable,
600 				   GError       **error)
601 {
602   GBufferedOutputStream        *bstream;
603   GOutputStream *base_stream;
604   GSeekable *base_stream_seekable;
605   gboolean flushed;
606 
607   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
608   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
609   if (!G_IS_SEEKABLE (base_stream))
610     {
611       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
612                            _("Truncate not supported on base stream"));
613       return FALSE;
614     }
615 
616   base_stream_seekable = G_SEEKABLE (base_stream);
617 
618   flushed = flush_buffer (bstream, cancellable, error);
619   if (!flushed)
620     return FALSE;
621   return g_seekable_truncate (base_stream_seekable, offset, cancellable, error);
622 }
623 
624 /* ************************** */
625 /* Async stuff implementation */
626 /* ************************** */
627 
628 /* TODO: This should be using the base class async ops, not threads */
629 
630 typedef struct {
631 
632   guint flush_stream : 1;
633   guint close_stream : 1;
634 
635 } FlushData;
636 
637 static void
free_flush_data(gpointer data)638 free_flush_data (gpointer data)
639 {
640   g_slice_free (FlushData, data);
641 }
642 
643 /* This function is used by all three (i.e.
644  * _write, _flush, _close) functions since
645  * all of them will need to flush the buffer
646  * and so closing and writing is just a special
647  * case of flushing + some addition stuff */
648 static void
flush_buffer_thread(GTask * task,gpointer object,gpointer task_data,GCancellable * cancellable)649 flush_buffer_thread (GTask        *task,
650                      gpointer      object,
651                      gpointer      task_data,
652                      GCancellable *cancellable)
653 {
654   GBufferedOutputStream *stream;
655   GOutputStream *base_stream;
656   FlushData     *fdata;
657   gboolean       res;
658   GError        *error = NULL;
659 
660   stream = G_BUFFERED_OUTPUT_STREAM (object);
661   fdata = task_data;
662   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
663 
664   res = flush_buffer (stream, cancellable, &error);
665 
666   /* if flushing the buffer didn't work don't even bother
667    * to flush the stream but just report that error */
668   if (res && fdata->flush_stream)
669     res = g_output_stream_flush (base_stream, cancellable, &error);
670 
671   if (fdata->close_stream)
672     {
673 
674       /* if flushing the buffer or the stream returned
675        * an error report that first error but still try
676        * close the stream */
677       if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
678         {
679           if (res == FALSE)
680             g_output_stream_close (base_stream, cancellable, NULL);
681           else
682             res = g_output_stream_close (base_stream, cancellable, &error);
683         }
684     }
685 
686   if (res == FALSE)
687     g_task_return_error (task, error);
688   else
689     g_task_return_boolean (task, TRUE);
690 }
691 
692 static void
g_buffered_output_stream_flush_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer data)693 g_buffered_output_stream_flush_async (GOutputStream        *stream,
694                                       int                   io_priority,
695                                       GCancellable         *cancellable,
696                                       GAsyncReadyCallback   callback,
697                                       gpointer              data)
698 {
699   GTask *task;
700   FlushData *fdata;
701 
702   fdata = g_slice_new0 (FlushData);
703   fdata->flush_stream = TRUE;
704   fdata->close_stream = FALSE;
705 
706   task = g_task_new (stream, cancellable, callback, data);
707   g_task_set_source_tag (task, g_buffered_output_stream_flush_async);
708   g_task_set_task_data (task, fdata, free_flush_data);
709   g_task_set_priority (task, io_priority);
710 
711   g_task_run_in_thread (task, flush_buffer_thread);
712   g_object_unref (task);
713 }
714 
715 static gboolean
g_buffered_output_stream_flush_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)716 g_buffered_output_stream_flush_finish (GOutputStream        *stream,
717                                        GAsyncResult         *result,
718                                        GError              **error)
719 {
720   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
721 
722   return g_task_propagate_boolean (G_TASK (result), error);
723 }
724 
725 static void
g_buffered_output_stream_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer data)726 g_buffered_output_stream_close_async (GOutputStream        *stream,
727                                       int                   io_priority,
728                                       GCancellable         *cancellable,
729                                       GAsyncReadyCallback   callback,
730                                       gpointer              data)
731 {
732   GTask *task;
733   FlushData *fdata;
734 
735   fdata = g_slice_new0 (FlushData);
736   fdata->close_stream = TRUE;
737 
738   task = g_task_new (stream, cancellable, callback, data);
739   g_task_set_source_tag (task, g_buffered_output_stream_close_async);
740   g_task_set_task_data (task, fdata, free_flush_data);
741   g_task_set_priority (task, io_priority);
742 
743   g_task_run_in_thread (task, flush_buffer_thread);
744   g_object_unref (task);
745 }
746 
747 static gboolean
g_buffered_output_stream_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)748 g_buffered_output_stream_close_finish (GOutputStream        *stream,
749                                        GAsyncResult         *result,
750                                        GError              **error)
751 {
752   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
753 
754   return g_task_propagate_boolean (G_TASK (result), error);
755 }
756