• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20 
21 #include "config.h"
22 #include <string.h>
23 #include "goutputstream.h"
24 #include "gcancellable.h"
25 #include "gasyncresult.h"
26 #include "gtask.h"
27 #include "ginputstream.h"
28 #include "gioerror.h"
29 #include "gioprivate.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32 
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()).
41  *
42  * To copy the content of an input stream to an output stream without
43  * manually handling the reads and writes, use g_output_stream_splice().
44  *
45  * See the documentation for #GIOStream for details of thread safety of
46  * streaming APIs.
47  *
48  * All of these functions have async variants too.
49  **/
50 
51 struct _GOutputStreamPrivate {
52   guint closed : 1;
53   guint pending : 1;
54   guint closing : 1;
55   GAsyncReadyCallback outstanding_callback;
56 };
57 
58 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
59 
60 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
61 						    GInputStream              *source,
62 						    GOutputStreamSpliceFlags   flags,
63 						    GCancellable              *cancellable,
64 						    GError                   **error);
65 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
66 						    const void                *buffer,
67 						    gsize                      count,
68 						    int                        io_priority,
69 						    GCancellable              *cancellable,
70 						    GAsyncReadyCallback        callback,
71 						    gpointer                   data);
72 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
73 						    GAsyncResult              *result,
74 						    GError                   **error);
75 static gboolean g_output_stream_real_writev        (GOutputStream             *stream,
76 						    const GOutputVector       *vectors,
77 						    gsize                      n_vectors,
78 						    gsize                     *bytes_written,
79 						    GCancellable              *cancellable,
80 						    GError                   **error);
81 static void     g_output_stream_real_writev_async  (GOutputStream             *stream,
82 						    const GOutputVector       *vectors,
83 						    gsize                      n_vectors,
84 						    int                        io_priority,
85 						    GCancellable              *cancellable,
86 						    GAsyncReadyCallback        callback,
87 						    gpointer                   data);
88 static gboolean g_output_stream_real_writev_finish (GOutputStream             *stream,
89 						    GAsyncResult              *result,
90 						    gsize                     *bytes_written,
91 						    GError                   **error);
92 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
93 						    GInputStream              *source,
94 						    GOutputStreamSpliceFlags   flags,
95 						    int                        io_priority,
96 						    GCancellable              *cancellable,
97 						    GAsyncReadyCallback        callback,
98 						    gpointer                   data);
99 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
100 						    GAsyncResult              *result,
101 						    GError                   **error);
102 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
103 						    int                        io_priority,
104 						    GCancellable              *cancellable,
105 						    GAsyncReadyCallback        callback,
106 						    gpointer                   data);
107 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
108 						    GAsyncResult              *result,
109 						    GError                   **error);
110 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
111 						    int                        io_priority,
112 						    GCancellable              *cancellable,
113 						    GAsyncReadyCallback        callback,
114 						    gpointer                   data);
115 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
116 						    GAsyncResult              *result,
117 						    GError                   **error);
118 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
119                                                     GCancellable              *cancellable,
120                                                     GError                   **error);
121 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
122                                                       int                      io_priority,
123                                                       GCancellable            *cancellable,
124                                                       GAsyncReadyCallback      callback,
125                                                       gpointer                 data);
126 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
127                                                        GAsyncResult           *result,
128                                                        GError                **error);
129 
130 static void
g_output_stream_dispose(GObject * object)131 g_output_stream_dispose (GObject *object)
132 {
133   GOutputStream *stream;
134 
135   stream = G_OUTPUT_STREAM (object);
136 
137   if (!stream->priv->closed)
138     g_output_stream_close (stream, NULL, NULL);
139 
140   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
141 }
142 
143 static void
g_output_stream_class_init(GOutputStreamClass * klass)144 g_output_stream_class_init (GOutputStreamClass *klass)
145 {
146   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
147 
148   gobject_class->dispose = g_output_stream_dispose;
149 
150   klass->splice = g_output_stream_real_splice;
151 
152   klass->write_async = g_output_stream_real_write_async;
153   klass->write_finish = g_output_stream_real_write_finish;
154   klass->writev_fn = g_output_stream_real_writev;
155   klass->writev_async = g_output_stream_real_writev_async;
156   klass->writev_finish = g_output_stream_real_writev_finish;
157   klass->splice_async = g_output_stream_real_splice_async;
158   klass->splice_finish = g_output_stream_real_splice_finish;
159   klass->flush_async = g_output_stream_real_flush_async;
160   klass->flush_finish = g_output_stream_real_flush_finish;
161   klass->close_async = g_output_stream_real_close_async;
162   klass->close_finish = g_output_stream_real_close_finish;
163 }
164 
165 static void
g_output_stream_init(GOutputStream * stream)166 g_output_stream_init (GOutputStream *stream)
167 {
168   stream->priv = g_output_stream_get_instance_private (stream);
169 }
170 
171 /**
172  * g_output_stream_write:
173  * @stream: a #GOutputStream.
174  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
175  * @count: the number of bytes to write
176  * @cancellable: (nullable): optional cancellable object
177  * @error: location to store the error occurring, or %NULL to ignore
178  *
179  * Tries to write @count bytes from @buffer into the stream. Will block
180  * during the operation.
181  *
182  * If count is 0, returns 0 and does nothing. A value of @count
183  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
184  *
185  * On success, the number of bytes written to the stream is returned.
186  * It is not an error if this is not the same as the requested size, as it
187  * can happen e.g. on a partial I/O error, or if there is not enough
188  * storage in the stream. All writes block until at least one byte
189  * is written or an error occurs; 0 is never returned (unless
190  * @count is 0).
191  *
192  * If @cancellable is not %NULL, then the operation can be cancelled by
193  * triggering the cancellable object from another thread. If the operation
194  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
195  * operation was partially finished when the operation was cancelled the
196  * partial result will be returned, without an error.
197  *
198  * On error -1 is returned and @error is set accordingly.
199  *
200  * Virtual: write_fn
201  *
202  * Returns: Number of bytes written, or -1 on error
203  **/
204 gssize
g_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)205 g_output_stream_write (GOutputStream  *stream,
206 		       const void     *buffer,
207 		       gsize           count,
208 		       GCancellable   *cancellable,
209 		       GError        **error)
210 {
211   GOutputStreamClass *class;
212   gssize res;
213 
214   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
215   g_return_val_if_fail (buffer != NULL, 0);
216 
217   if (count == 0)
218     return 0;
219 
220   if (((gssize) count) < 0)
221     {
222       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
223 		   _("Too large count value passed to %s"), G_STRFUNC);
224       return -1;
225     }
226 
227   class = G_OUTPUT_STREAM_GET_CLASS (stream);
228 
229   if (class->write_fn == NULL)
230     {
231       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
232                            _("Output stream doesn’t implement write"));
233       return -1;
234     }
235 
236   if (!g_output_stream_set_pending (stream, error))
237     return -1;
238 
239   if (cancellable)
240     g_cancellable_push_current (cancellable);
241 
242   res = class->write_fn (stream, buffer, count, cancellable, error);
243 
244   if (cancellable)
245     g_cancellable_pop_current (cancellable);
246 
247   g_output_stream_clear_pending (stream);
248 
249   return res;
250 }
251 
252 /**
253  * g_output_stream_write_all:
254  * @stream: a #GOutputStream.
255  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
256  * @count: the number of bytes to write
257  * @bytes_written: (out) (optional): location to store the number of bytes that was
258  *     written to the stream
259  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
260  * @error: location to store the error occurring, or %NULL to ignore
261  *
262  * Tries to write @count bytes from @buffer into the stream. Will block
263  * during the operation.
264  *
265  * This function is similar to g_output_stream_write(), except it tries to
266  * write as many bytes as requested, only stopping on an error.
267  *
268  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
269  * is set to @count.
270  *
271  * If there is an error during the operation %FALSE is returned and @error
272  * is set to indicate the error status.
273  *
274  * As a special exception to the normal conventions for functions that
275  * use #GError, if this function returns %FALSE (and sets @error) then
276  * @bytes_written will be set to the number of bytes that were
277  * successfully written before the error was encountered.  This
278  * functionality is only available from C.  If you need it from another
279  * language then you must write your own loop around
280  * g_output_stream_write().
281  *
282  * Returns: %TRUE on success, %FALSE if there was an error
283  **/
284 gboolean
g_output_stream_write_all(GOutputStream * stream,const void * buffer,gsize count,gsize * bytes_written,GCancellable * cancellable,GError ** error)285 g_output_stream_write_all (GOutputStream  *stream,
286 			   const void     *buffer,
287 			   gsize           count,
288 			   gsize          *bytes_written,
289 			   GCancellable   *cancellable,
290 			   GError        **error)
291 {
292   gsize _bytes_written;
293   gssize res;
294 
295   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
296   g_return_val_if_fail (buffer != NULL, FALSE);
297 
298   _bytes_written = 0;
299   while (_bytes_written < count)
300     {
301       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
302 				   cancellable, error);
303       if (res == -1)
304 	{
305 	  if (bytes_written)
306 	    *bytes_written = _bytes_written;
307 	  return FALSE;
308 	}
309       g_return_val_if_fail (res > 0, FALSE);
310 
311       _bytes_written += res;
312     }
313 
314   if (bytes_written)
315     *bytes_written = _bytes_written;
316 
317   return TRUE;
318 }
319 
320 /**
321  * g_output_stream_writev:
322  * @stream: a #GOutputStream.
323  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
324  * @n_vectors: the number of vectors to write
325  * @bytes_written: (out) (optional): location to store the number of bytes that were
326  *     written to the stream
327  * @cancellable: (nullable): optional cancellable object
328  * @error: location to store the error occurring, or %NULL to ignore
329  *
330  * Tries to write the bytes contained in the @n_vectors @vectors into the
331  * stream. Will block during the operation.
332  *
333  * If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
334  * does nothing.
335  *
336  * On success, the number of bytes written to the stream is returned.
337  * It is not an error if this is not the same as the requested size, as it
338  * can happen e.g. on a partial I/O error, or if there is not enough
339  * storage in the stream. All writes block until at least one byte
340  * is written or an error occurs; 0 is never returned (unless
341  * @n_vectors is 0 or the sum of all bytes in @vectors is 0).
342  *
343  * If @cancellable is not %NULL, then the operation can be cancelled by
344  * triggering the cancellable object from another thread. If the operation
345  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
346  * operation was partially finished when the operation was cancelled the
347  * partial result will be returned, without an error.
348  *
349  * Some implementations of g_output_stream_writev() may have limitations on the
350  * aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these
351  * are exceeded. For example, when writing to a local file on UNIX platforms,
352  * the aggregate buffer size must not exceed %G_MAXSSIZE bytes.
353  *
354  * Virtual: writev_fn
355  *
356  * Returns: %TRUE on success, %FALSE if there was an error
357  *
358  * Since: 2.60
359  */
360 gboolean
g_output_stream_writev(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,gsize * bytes_written,GCancellable * cancellable,GError ** error)361 g_output_stream_writev (GOutputStream        *stream,
362 		        const GOutputVector  *vectors,
363 		        gsize                 n_vectors,
364 		        gsize                *bytes_written,
365 		        GCancellable         *cancellable,
366 		        GError              **error)
367 {
368   GOutputStreamClass *class;
369   gboolean res;
370   gsize _bytes_written = 0;
371 
372   if (bytes_written)
373     *bytes_written = 0;
374 
375   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
376   g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
377   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
378   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
379 
380   if (n_vectors == 0)
381     return TRUE;
382 
383   class = G_OUTPUT_STREAM_GET_CLASS (stream);
384 
385   g_return_val_if_fail (class->writev_fn != NULL, FALSE);
386 
387   if (!g_output_stream_set_pending (stream, error))
388     return FALSE;
389 
390   if (cancellable)
391     g_cancellable_push_current (cancellable);
392 
393   res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error);
394 
395   g_warn_if_fail (res || _bytes_written == 0);
396   g_warn_if_fail (res || (error == NULL || *error != NULL));
397 
398   if (cancellable)
399     g_cancellable_pop_current (cancellable);
400 
401   g_output_stream_clear_pending (stream);
402 
403   if (bytes_written)
404     *bytes_written = _bytes_written;
405 
406   return res;
407 }
408 
409 /**
410  * g_output_stream_writev_all:
411  * @stream: a #GOutputStream.
412  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
413  * @n_vectors: the number of vectors to write
414  * @bytes_written: (out) (optional): location to store the number of bytes that were
415  *     written to the stream
416  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
417  * @error: location to store the error occurring, or %NULL to ignore
418  *
419  * Tries to write the bytes contained in the @n_vectors @vectors into the
420  * stream. Will block during the operation.
421  *
422  * This function is similar to g_output_stream_writev(), except it tries to
423  * write as many bytes as requested, only stopping on an error.
424  *
425  * On a successful write of all @n_vectors vectors, %TRUE is returned, and
426  * @bytes_written is set to the sum of all the sizes of @vectors.
427  *
428  * If there is an error during the operation %FALSE is returned and @error
429  * is set to indicate the error status.
430  *
431  * As a special exception to the normal conventions for functions that
432  * use #GError, if this function returns %FALSE (and sets @error) then
433  * @bytes_written will be set to the number of bytes that were
434  * successfully written before the error was encountered.  This
435  * functionality is only available from C. If you need it from another
436  * language then you must write your own loop around
437  * g_output_stream_write().
438  *
439  * The content of the individual elements of @vectors might be changed by this
440  * function.
441  *
442  * Returns: %TRUE on success, %FALSE if there was an error
443  *
444  * Since: 2.60
445  */
446 gboolean
g_output_stream_writev_all(GOutputStream * stream,GOutputVector * vectors,gsize n_vectors,gsize * bytes_written,GCancellable * cancellable,GError ** error)447 g_output_stream_writev_all (GOutputStream  *stream,
448 			    GOutputVector  *vectors,
449 			    gsize           n_vectors,
450 			    gsize          *bytes_written,
451 			    GCancellable   *cancellable,
452 			    GError        **error)
453 {
454   gsize _bytes_written = 0;
455   gsize i, to_be_written = 0;
456 
457   if (bytes_written)
458     *bytes_written = 0;
459 
460   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
461   g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
462   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
463   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
464 
465   /* We can't write more than G_MAXSIZE bytes overall, otherwise we
466    * would overflow the bytes_written counter */
467   for (i = 0; i < n_vectors; i++)
468     {
469        if (to_be_written > G_MAXSIZE - vectors[i].size)
470          {
471            g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
472                         _("Sum of vectors passed to %s too large"), G_STRFUNC);
473            return FALSE;
474          }
475        to_be_written += vectors[i].size;
476     }
477 
478   _bytes_written = 0;
479   while (n_vectors > 0 && to_be_written > 0)
480     {
481       gsize n_written = 0;
482       gboolean res;
483 
484       res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error);
485 
486       if (!res)
487         {
488           if (bytes_written)
489             *bytes_written = _bytes_written;
490           return FALSE;
491         }
492 
493       g_return_val_if_fail (n_written > 0, FALSE);
494       _bytes_written += n_written;
495 
496       /* skip vectors that have been written in full */
497       while (n_vectors > 0 && n_written >= vectors[0].size)
498         {
499           n_written -= vectors[0].size;
500           ++vectors;
501           --n_vectors;
502         }
503       /* skip partially written vector data */
504       if (n_written > 0 && n_vectors > 0)
505         {
506           vectors[0].size -= n_written;
507           vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written;
508         }
509     }
510 
511   if (bytes_written)
512     *bytes_written = _bytes_written;
513 
514   return TRUE;
515 }
516 
517 /**
518  * g_output_stream_printf:
519  * @stream: a #GOutputStream.
520  * @bytes_written: (out) (optional): location to store the number of bytes that was
521  *     written to the stream
522  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
523  * @error: location to store the error occurring, or %NULL to ignore
524  * @format: the format string. See the printf() documentation
525  * @...: the parameters to insert into the format string
526  *
527  * This is a utility function around g_output_stream_write_all(). It
528  * uses g_strdup_vprintf() to turn @format and @... into a string that
529  * is then written to @stream.
530  *
531  * See the documentation of g_output_stream_write_all() about the
532  * behavior of the actual write operation.
533  *
534  * Note that partial writes cannot be properly checked with this
535  * function due to the variable length of the written string, if you
536  * need precise control over partial write failures, you need to
537  * create you own printf()-like wrapper around g_output_stream_write()
538  * or g_output_stream_write_all().
539  *
540  * Since: 2.40
541  *
542  * Returns: %TRUE on success, %FALSE if there was an error
543  **/
544 gboolean
g_output_stream_printf(GOutputStream * stream,gsize * bytes_written,GCancellable * cancellable,GError ** error,const gchar * format,...)545 g_output_stream_printf (GOutputStream  *stream,
546                         gsize          *bytes_written,
547                         GCancellable   *cancellable,
548                         GError        **error,
549                         const gchar    *format,
550                         ...)
551 {
552   va_list  args;
553   gboolean success;
554 
555   va_start (args, format);
556   success = g_output_stream_vprintf (stream, bytes_written, cancellable,
557                                      error, format, args);
558   va_end (args);
559 
560   return success;
561 }
562 
563 /**
564  * g_output_stream_vprintf:
565  * @stream: a #GOutputStream.
566  * @bytes_written: (out) (optional): location to store the number of bytes that was
567  *     written to the stream
568  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
569  * @error: location to store the error occurring, or %NULL to ignore
570  * @format: the format string. See the printf() documentation
571  * @args: the parameters to insert into the format string
572  *
573  * This is a utility function around g_output_stream_write_all(). It
574  * uses g_strdup_vprintf() to turn @format and @args into a string that
575  * is then written to @stream.
576  *
577  * See the documentation of g_output_stream_write_all() about the
578  * behavior of the actual write operation.
579  *
580  * Note that partial writes cannot be properly checked with this
581  * function due to the variable length of the written string, if you
582  * need precise control over partial write failures, you need to
583  * create you own printf()-like wrapper around g_output_stream_write()
584  * or g_output_stream_write_all().
585  *
586  * Since: 2.40
587  *
588  * Returns: %TRUE on success, %FALSE if there was an error
589  **/
590 gboolean
g_output_stream_vprintf(GOutputStream * stream,gsize * bytes_written,GCancellable * cancellable,GError ** error,const gchar * format,va_list args)591 g_output_stream_vprintf (GOutputStream  *stream,
592                          gsize          *bytes_written,
593                          GCancellable   *cancellable,
594                          GError        **error,
595                          const gchar    *format,
596                          va_list         args)
597 {
598   gchar    *text;
599   gboolean  success;
600 
601   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
602   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
603   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
604   g_return_val_if_fail (format != NULL, FALSE);
605 
606   text = g_strdup_vprintf (format, args);
607   success = g_output_stream_write_all (stream,
608                                        text, strlen (text),
609                                        bytes_written, cancellable, error);
610   g_free (text);
611 
612   return success;
613 }
614 
615 /**
616  * g_output_stream_write_bytes:
617  * @stream: a #GOutputStream.
618  * @bytes: the #GBytes to write
619  * @cancellable: (nullable): optional cancellable object
620  * @error: location to store the error occurring, or %NULL to ignore
621  *
622  * A wrapper function for g_output_stream_write() which takes a
623  * #GBytes as input.  This can be more convenient for use by language
624  * bindings or in other cases where the refcounted nature of #GBytes
625  * is helpful over a bare pointer interface.
626  *
627  * However, note that this function may still perform partial writes,
628  * just like g_output_stream_write().  If that occurs, to continue
629  * writing, you will need to create a new #GBytes containing just the
630  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
631  * #GBytes instance multiple times potentially can result in duplicated
632  * data in the output stream.
633  *
634  * Returns: Number of bytes written, or -1 on error
635  **/
636 gssize
g_output_stream_write_bytes(GOutputStream * stream,GBytes * bytes,GCancellable * cancellable,GError ** error)637 g_output_stream_write_bytes (GOutputStream  *stream,
638 			     GBytes         *bytes,
639 			     GCancellable   *cancellable,
640 			     GError        **error)
641 {
642   gsize size;
643   gconstpointer data;
644 
645   data = g_bytes_get_data (bytes, &size);
646 
647   return g_output_stream_write (stream,
648                                 data, size,
649 				cancellable,
650 				error);
651 }
652 
653 /**
654  * g_output_stream_flush:
655  * @stream: a #GOutputStream.
656  * @cancellable: (nullable): optional cancellable object
657  * @error: location to store the error occurring, or %NULL to ignore
658  *
659  * Forces a write of all user-space buffered data for the given
660  * @stream. Will block during the operation. Closing the stream will
661  * implicitly cause a flush.
662  *
663  * This function is optional for inherited classes.
664  *
665  * If @cancellable is not %NULL, then the operation can be cancelled by
666  * triggering the cancellable object from another thread. If the operation
667  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
668  *
669  * Returns: %TRUE on success, %FALSE on error
670  **/
671 gboolean
g_output_stream_flush(GOutputStream * stream,GCancellable * cancellable,GError ** error)672 g_output_stream_flush (GOutputStream  *stream,
673                        GCancellable   *cancellable,
674                        GError        **error)
675 {
676   GOutputStreamClass *class;
677   gboolean res;
678 
679   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
680 
681   if (!g_output_stream_set_pending (stream, error))
682     return FALSE;
683 
684   class = G_OUTPUT_STREAM_GET_CLASS (stream);
685 
686   res = TRUE;
687   if (class->flush)
688     {
689       if (cancellable)
690 	g_cancellable_push_current (cancellable);
691 
692       res = class->flush (stream, cancellable, error);
693 
694       if (cancellable)
695 	g_cancellable_pop_current (cancellable);
696     }
697 
698   g_output_stream_clear_pending (stream);
699 
700   return res;
701 }
702 
703 /**
704  * g_output_stream_splice:
705  * @stream: a #GOutputStream.
706  * @source: a #GInputStream.
707  * @flags: a set of #GOutputStreamSpliceFlags.
708  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
709  * @error: a #GError location to store the error occurring, or %NULL to
710  * ignore.
711  *
712  * Splices an input stream into an output stream.
713  *
714  * Returns: a #gssize containing the size of the data spliced, or
715  *     -1 if an error occurred. Note that if the number of bytes
716  *     spliced is greater than %G_MAXSSIZE, then that will be
717  *     returned, and there is no way to determine the actual number
718  *     of bytes spliced.
719  **/
720 gssize
g_output_stream_splice(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,GCancellable * cancellable,GError ** error)721 g_output_stream_splice (GOutputStream             *stream,
722 			GInputStream              *source,
723 			GOutputStreamSpliceFlags   flags,
724 			GCancellable              *cancellable,
725 			GError                   **error)
726 {
727   GOutputStreamClass *class;
728   gssize bytes_copied;
729 
730   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
731   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
732 
733   if (g_input_stream_is_closed (source))
734     {
735       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
736                            _("Source stream is already closed"));
737       return -1;
738     }
739 
740   if (!g_output_stream_set_pending (stream, error))
741     return -1;
742 
743   class = G_OUTPUT_STREAM_GET_CLASS (stream);
744 
745   if (cancellable)
746     g_cancellable_push_current (cancellable);
747 
748   bytes_copied = class->splice (stream, source, flags, cancellable, error);
749 
750   if (cancellable)
751     g_cancellable_pop_current (cancellable);
752 
753   g_output_stream_clear_pending (stream);
754 
755   return bytes_copied;
756 }
757 
758 static gssize
g_output_stream_real_splice(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,GCancellable * cancellable,GError ** error)759 g_output_stream_real_splice (GOutputStream             *stream,
760                              GInputStream              *source,
761                              GOutputStreamSpliceFlags   flags,
762                              GCancellable              *cancellable,
763                              GError                   **error)
764 {
765   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
766   gssize n_read, n_written;
767   gsize bytes_copied;
768   char buffer[8192], *p;
769   gboolean res;
770 
771   bytes_copied = 0;
772   if (class->write_fn == NULL)
773     {
774       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
775                            _("Output stream doesn’t implement write"));
776       res = FALSE;
777       goto notsupported;
778     }
779 
780   res = TRUE;
781   do
782     {
783       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
784       if (n_read == -1)
785 	{
786 	  res = FALSE;
787 	  break;
788 	}
789 
790       if (n_read == 0)
791 	break;
792 
793       p = buffer;
794       while (n_read > 0)
795 	{
796 	  n_written = class->write_fn (stream, p, n_read, cancellable, error);
797 	  if (n_written == -1)
798 	    {
799 	      res = FALSE;
800 	      break;
801 	    }
802 
803 	  p += n_written;
804 	  n_read -= n_written;
805 	  bytes_copied += n_written;
806 	}
807 
808       if (bytes_copied > G_MAXSSIZE)
809 	bytes_copied = G_MAXSSIZE;
810     }
811   while (res);
812 
813  notsupported:
814   if (!res)
815     error = NULL; /* Ignore further errors */
816 
817   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
818     {
819       /* Don't care about errors in source here */
820       g_input_stream_close (source, cancellable, NULL);
821     }
822 
823   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
824     {
825       /* But write errors on close are bad! */
826       if (!g_output_stream_internal_close (stream, cancellable, error))
827         res = FALSE;
828     }
829 
830   if (res)
831     return bytes_copied;
832 
833   return -1;
834 }
835 
836 /* Must always be called inside
837  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
838 static gboolean
g_output_stream_internal_close(GOutputStream * stream,GCancellable * cancellable,GError ** error)839 g_output_stream_internal_close (GOutputStream  *stream,
840                                 GCancellable   *cancellable,
841                                 GError        **error)
842 {
843   GOutputStreamClass *class;
844   gboolean res;
845 
846   if (stream->priv->closed)
847     return TRUE;
848 
849   class = G_OUTPUT_STREAM_GET_CLASS (stream);
850 
851   stream->priv->closing = TRUE;
852 
853   if (cancellable)
854     g_cancellable_push_current (cancellable);
855 
856   if (class->flush)
857     res = class->flush (stream, cancellable, error);
858   else
859     res = TRUE;
860 
861   if (!res)
862     {
863       /* flushing caused the error that we want to return,
864        * but we still want to close the underlying stream if possible
865        */
866       if (class->close_fn)
867         class->close_fn (stream, cancellable, NULL);
868     }
869   else
870     {
871       res = TRUE;
872       if (class->close_fn)
873         res = class->close_fn (stream, cancellable, error);
874     }
875 
876   if (cancellable)
877     g_cancellable_pop_current (cancellable);
878 
879   stream->priv->closing = FALSE;
880   stream->priv->closed = TRUE;
881 
882   return res;
883 }
884 
885 /**
886  * g_output_stream_close:
887  * @stream: A #GOutputStream.
888  * @cancellable: (nullable): optional cancellable object
889  * @error: location to store the error occurring, or %NULL to ignore
890  *
891  * Closes the stream, releasing resources related to it.
892  *
893  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
894  * Closing a stream multiple times will not return an error.
895  *
896  * Closing a stream will automatically flush any outstanding buffers in the
897  * stream.
898  *
899  * Streams will be automatically closed when the last reference
900  * is dropped, but you might want to call this function to make sure
901  * resources are released as early as possible.
902  *
903  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
904  * open after the stream is closed. See the documentation for the individual
905  * stream for details.
906  *
907  * On failure the first error that happened will be reported, but the close
908  * operation will finish as much as possible. A stream that failed to
909  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
910  * is important to check and report the error to the user, otherwise
911  * there might be a loss of data as all data might not be written.
912  *
913  * If @cancellable is not %NULL, then the operation can be cancelled by
914  * triggering the cancellable object from another thread. If the operation
915  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
916  * Cancelling a close will still leave the stream closed, but there some streams
917  * can use a faster close that doesn't block to e.g. check errors. On
918  * cancellation (as with any error) there is no guarantee that all written
919  * data will reach the target.
920  *
921  * Returns: %TRUE on success, %FALSE on failure
922  **/
923 gboolean
g_output_stream_close(GOutputStream * stream,GCancellable * cancellable,GError ** error)924 g_output_stream_close (GOutputStream  *stream,
925 		       GCancellable   *cancellable,
926 		       GError        **error)
927 {
928   gboolean res;
929 
930   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
931 
932   if (stream->priv->closed)
933     return TRUE;
934 
935   if (!g_output_stream_set_pending (stream, error))
936     return FALSE;
937 
938   res = g_output_stream_internal_close (stream, cancellable, error);
939 
940   g_output_stream_clear_pending (stream);
941 
942   return res;
943 }
944 
945 static void
async_ready_write_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)946 async_ready_write_callback_wrapper (GObject      *source_object,
947                                     GAsyncResult *res,
948                                     gpointer      user_data)
949 {
950   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
951   GOutputStreamClass *class;
952   GTask *task = user_data;
953   gssize nwrote;
954   GError *error = NULL;
955 
956   g_output_stream_clear_pending (stream);
957 
958   if (g_async_result_legacy_propagate_error (res, &error))
959     nwrote = -1;
960   else
961     {
962       class = G_OUTPUT_STREAM_GET_CLASS (stream);
963       nwrote = class->write_finish (stream, res, &error);
964     }
965 
966   if (nwrote >= 0)
967     g_task_return_int (task, nwrote);
968   else
969     g_task_return_error (task, error);
970   g_object_unref (task);
971 }
972 
973 /**
974  * g_output_stream_write_async:
975  * @stream: A #GOutputStream.
976  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
977  * @count: the number of bytes to write
978  * @io_priority: the io priority of the request.
979  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
980  * @callback: (scope async): callback to call when the request is satisfied
981  * @user_data: (closure): the data to pass to callback function
982  *
983  * Request an asynchronous write of @count bytes from @buffer into
984  * the stream. When the operation is finished @callback will be called.
985  * You can then call g_output_stream_write_finish() to get the result of the
986  * operation.
987  *
988  * During an async request no other sync and async calls are allowed,
989  * and will result in %G_IO_ERROR_PENDING errors.
990  *
991  * A value of @count larger than %G_MAXSSIZE will cause a
992  * %G_IO_ERROR_INVALID_ARGUMENT error.
993  *
994  * On success, the number of bytes written will be passed to the
995  * @callback. It is not an error if this is not the same as the
996  * requested size, as it can happen e.g. on a partial I/O error,
997  * but generally we try to write as many bytes as requested.
998  *
999  * You are guaranteed that this method will never fail with
1000  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
1001  * method will just wait until this changes.
1002  *
1003  * Any outstanding I/O request with higher priority (lower numerical
1004  * value) will be executed before an outstanding request with lower
1005  * priority. Default priority is %G_PRIORITY_DEFAULT.
1006  *
1007  * The asynchronous methods have a default fallback that uses threads
1008  * to implement asynchronicity, so they are optional for inheriting
1009  * classes. However, if you override one you must override all.
1010  *
1011  * For the synchronous, blocking version of this function, see
1012  * g_output_stream_write().
1013  *
1014  * Note that no copy of @buffer will be made, so it must stay valid
1015  * until @callback is called. See g_output_stream_write_bytes_async()
1016  * for a #GBytes version that will automatically hold a reference to
1017  * the contents (without copying) for the duration of the call.
1018  */
1019 void
g_output_stream_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1020 g_output_stream_write_async (GOutputStream       *stream,
1021 			     const void          *buffer,
1022 			     gsize                count,
1023 			     int                  io_priority,
1024 			     GCancellable        *cancellable,
1025 			     GAsyncReadyCallback  callback,
1026 			     gpointer             user_data)
1027 {
1028   GOutputStreamClass *class;
1029   GError *error = NULL;
1030   GTask *task;
1031 
1032   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1033   g_return_if_fail (buffer != NULL);
1034 
1035   task = g_task_new (stream, cancellable, callback, user_data);
1036   g_task_set_source_tag (task, g_output_stream_write_async);
1037   g_task_set_priority (task, io_priority);
1038 
1039   if (count == 0)
1040     {
1041       g_task_return_int (task, 0);
1042       g_object_unref (task);
1043       return;
1044     }
1045 
1046   if (((gssize) count) < 0)
1047     {
1048       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1049                                _("Too large count value passed to %s"),
1050                                G_STRFUNC);
1051       g_object_unref (task);
1052       return;
1053     }
1054 
1055   if (!g_output_stream_set_pending (stream, &error))
1056     {
1057       g_task_return_error (task, error);
1058       g_object_unref (task);
1059       return;
1060     }
1061 
1062   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1063 
1064   class->write_async (stream, buffer, count, io_priority, cancellable,
1065                       async_ready_write_callback_wrapper, task);
1066 }
1067 
1068 /**
1069  * g_output_stream_write_finish:
1070  * @stream: a #GOutputStream.
1071  * @result: a #GAsyncResult.
1072  * @error: a #GError location to store the error occurring, or %NULL to
1073  * ignore.
1074  *
1075  * Finishes a stream write operation.
1076  *
1077  * Returns: a #gssize containing the number of bytes written to the stream.
1078  **/
1079 gssize
g_output_stream_write_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1080 g_output_stream_write_finish (GOutputStream  *stream,
1081                               GAsyncResult   *result,
1082                               GError        **error)
1083 {
1084   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1085   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1086   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
1087 
1088   /* @result is always the GTask created by g_output_stream_write_async();
1089    * we called class->write_finish() from async_ready_write_callback_wrapper.
1090    */
1091   return g_task_propagate_int (G_TASK (result), error);
1092 }
1093 
1094 typedef struct
1095 {
1096   const guint8 *buffer;
1097   gsize to_write;
1098   gsize bytes_written;
1099 } AsyncWriteAll;
1100 
1101 static void
free_async_write_all(gpointer data)1102 free_async_write_all (gpointer data)
1103 {
1104   g_slice_free (AsyncWriteAll, data);
1105 }
1106 
1107 static void
write_all_callback(GObject * stream,GAsyncResult * result,gpointer user_data)1108 write_all_callback (GObject      *stream,
1109                     GAsyncResult *result,
1110                     gpointer      user_data)
1111 {
1112   GTask *task = user_data;
1113   AsyncWriteAll *data = g_task_get_task_data (task);
1114 
1115   if (result)
1116     {
1117       GError *error = NULL;
1118       gssize nwritten;
1119 
1120       nwritten = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), result, &error);
1121 
1122       if (nwritten == -1)
1123         {
1124           g_task_return_error (task, error);
1125           g_object_unref (task);
1126           return;
1127         }
1128 
1129       g_assert_cmpint (nwritten, <=, data->to_write);
1130       g_warn_if_fail (nwritten > 0);
1131 
1132       data->to_write -= nwritten;
1133       data->bytes_written += nwritten;
1134     }
1135 
1136   if (data->to_write == 0)
1137     {
1138       g_task_return_boolean (task, TRUE);
1139       g_object_unref (task);
1140     }
1141   else
1142     g_output_stream_write_async (G_OUTPUT_STREAM (stream),
1143                                  data->buffer + data->bytes_written,
1144                                  data->to_write,
1145                                  g_task_get_priority (task),
1146                                  g_task_get_cancellable (task),
1147                                  write_all_callback, task);
1148 }
1149 
1150 static void
write_all_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1151 write_all_async_thread (GTask        *task,
1152                         gpointer      source_object,
1153                         gpointer      task_data,
1154                         GCancellable *cancellable)
1155 {
1156   GOutputStream *stream = source_object;
1157   AsyncWriteAll *data = task_data;
1158   GError *error = NULL;
1159 
1160   if (g_output_stream_write_all (stream, data->buffer, data->to_write, &data->bytes_written,
1161                                  g_task_get_cancellable (task), &error))
1162     g_task_return_boolean (task, TRUE);
1163   else
1164     g_task_return_error (task, error);
1165 }
1166 
1167 /**
1168  * g_output_stream_write_all_async:
1169  * @stream: A #GOutputStream
1170  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write
1171  * @count: the number of bytes to write
1172  * @io_priority: the io priority of the request
1173  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
1174  * @callback: (scope async): callback to call when the request is satisfied
1175  * @user_data: (closure): the data to pass to callback function
1176  *
1177  * Request an asynchronous write of @count bytes from @buffer into
1178  * the stream. When the operation is finished @callback will be called.
1179  * You can then call g_output_stream_write_all_finish() to get the result of the
1180  * operation.
1181  *
1182  * This is the asynchronous version of g_output_stream_write_all().
1183  *
1184  * Call g_output_stream_write_all_finish() to collect the result.
1185  *
1186  * Any outstanding I/O request with higher priority (lower numerical
1187  * value) will be executed before an outstanding request with lower
1188  * priority. Default priority is %G_PRIORITY_DEFAULT.
1189  *
1190  * Note that no copy of @buffer will be made, so it must stay valid
1191  * until @callback is called.
1192  *
1193  * Since: 2.44
1194  */
1195 void
g_output_stream_write_all_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1196 g_output_stream_write_all_async (GOutputStream       *stream,
1197                                  const void          *buffer,
1198                                  gsize                count,
1199                                  int                  io_priority,
1200                                  GCancellable        *cancellable,
1201                                  GAsyncReadyCallback  callback,
1202                                  gpointer             user_data)
1203 {
1204   AsyncWriteAll *data;
1205   GTask *task;
1206 
1207   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1208   g_return_if_fail (buffer != NULL || count == 0);
1209 
1210   task = g_task_new (stream, cancellable, callback, user_data);
1211   data = g_slice_new0 (AsyncWriteAll);
1212   data->buffer = buffer;
1213   data->to_write = count;
1214 
1215   g_task_set_source_tag (task, g_output_stream_write_all_async);
1216   g_task_set_task_data (task, data, free_async_write_all);
1217   g_task_set_priority (task, io_priority);
1218 
1219   /* If async writes are going to be handled via the threadpool anyway
1220    * then we may as well do it with a single dispatch instead of
1221    * bouncing in and out.
1222    */
1223   if (g_output_stream_async_write_is_via_threads (stream))
1224     {
1225       g_task_run_in_thread (task, write_all_async_thread);
1226       g_object_unref (task);
1227     }
1228   else
1229     write_all_callback (G_OBJECT (stream), NULL, task);
1230 }
1231 
1232 /**
1233  * g_output_stream_write_all_finish:
1234  * @stream: a #GOutputStream
1235  * @result: a #GAsyncResult
1236  * @bytes_written: (out) (optional): location to store the number of bytes that was written to the stream
1237  * @error: a #GError location to store the error occurring, or %NULL to ignore.
1238  *
1239  * Finishes an asynchronous stream write operation started with
1240  * g_output_stream_write_all_async().
1241  *
1242  * As a special exception to the normal conventions for functions that
1243  * use #GError, if this function returns %FALSE (and sets @error) then
1244  * @bytes_written will be set to the number of bytes that were
1245  * successfully written before the error was encountered.  This
1246  * functionality is only available from C.  If you need it from another
1247  * language then you must write your own loop around
1248  * g_output_stream_write_async().
1249  *
1250  * Returns: %TRUE on success, %FALSE if there was an error
1251  *
1252  * Since: 2.44
1253  **/
1254 gboolean
g_output_stream_write_all_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)1255 g_output_stream_write_all_finish (GOutputStream  *stream,
1256                                   GAsyncResult   *result,
1257                                   gsize          *bytes_written,
1258                                   GError        **error)
1259 {
1260   GTask *task;
1261 
1262   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1263   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1264 
1265   task = G_TASK (result);
1266 
1267   if (bytes_written)
1268     {
1269       AsyncWriteAll *data = (AsyncWriteAll *)g_task_get_task_data (task);
1270 
1271       *bytes_written = data->bytes_written;
1272     }
1273 
1274   return g_task_propagate_boolean (task, error);
1275 }
1276 
1277 /**
1278  * g_output_stream_writev_async:
1279  * @stream: A #GOutputStream.
1280  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
1281  * @n_vectors: the number of vectors to write
1282  * @io_priority: the I/O priority of the request.
1283  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1284  * @callback: (scope async): callback to call when the request is satisfied
1285  * @user_data: (closure): the data to pass to callback function
1286  *
1287  * Request an asynchronous write of the bytes contained in @n_vectors @vectors into
1288  * the stream. When the operation is finished @callback will be called.
1289  * You can then call g_output_stream_writev_finish() to get the result of the
1290  * operation.
1291  *
1292  * During an async request no other sync and async calls are allowed,
1293  * and will result in %G_IO_ERROR_PENDING errors.
1294  *
1295  * On success, the number of bytes written will be passed to the
1296  * @callback. It is not an error if this is not the same as the
1297  * requested size, as it can happen e.g. on a partial I/O error,
1298  * but generally we try to write as many bytes as requested.
1299  *
1300  * You are guaranteed that this method will never fail with
1301  * %G_IO_ERROR_WOULD_BLOCK — if @stream can't accept more data, the
1302  * method will just wait until this changes.
1303  *
1304  * Any outstanding I/O request with higher priority (lower numerical
1305  * value) will be executed before an outstanding request with lower
1306  * priority. Default priority is %G_PRIORITY_DEFAULT.
1307  *
1308  * The asynchronous methods have a default fallback that uses threads
1309  * to implement asynchronicity, so they are optional for inheriting
1310  * classes. However, if you override one you must override all.
1311  *
1312  * For the synchronous, blocking version of this function, see
1313  * g_output_stream_writev().
1314  *
1315  * Note that no copy of @vectors will be made, so it must stay valid
1316  * until @callback is called.
1317  *
1318  * Since: 2.60
1319  */
1320 void
g_output_stream_writev_async(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1321 g_output_stream_writev_async (GOutputStream             *stream,
1322 			      const GOutputVector       *vectors,
1323 			      gsize                      n_vectors,
1324 			      int                        io_priority,
1325 			      GCancellable              *cancellable,
1326 			      GAsyncReadyCallback        callback,
1327 			      gpointer                   user_data)
1328 {
1329   GOutputStreamClass *class;
1330 
1331   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1332   g_return_if_fail (vectors != NULL || n_vectors == 0);
1333   g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
1334 
1335   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1336   g_return_if_fail (class->writev_async != NULL);
1337 
1338   class->writev_async (stream, vectors, n_vectors, io_priority, cancellable,
1339                        callback, user_data);
1340 }
1341 
1342 /**
1343  * g_output_stream_writev_finish:
1344  * @stream: a #GOutputStream.
1345  * @result: a #GAsyncResult.
1346  * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
1347  * @error: a #GError location to store the error occurring, or %NULL to
1348  * ignore.
1349  *
1350  * Finishes a stream writev operation.
1351  *
1352  * Returns: %TRUE on success, %FALSE if there was an error
1353  *
1354  * Since: 2.60
1355  */
1356 gboolean
g_output_stream_writev_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)1357 g_output_stream_writev_finish (GOutputStream  *stream,
1358                                GAsyncResult   *result,
1359                                gsize          *bytes_written,
1360                                GError        **error)
1361 {
1362   GOutputStreamClass *class;
1363   gboolean res;
1364   gsize _bytes_written = 0;
1365 
1366   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1367   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1368   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
1369 
1370   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1371   g_return_val_if_fail (class->writev_finish != NULL, FALSE);
1372 
1373   res = class->writev_finish (stream, result, &_bytes_written, error);
1374 
1375   g_warn_if_fail (res || _bytes_written == 0);
1376   g_warn_if_fail (res || (error == NULL || *error != NULL));
1377 
1378   if (bytes_written)
1379     *bytes_written = _bytes_written;
1380 
1381   return res;
1382 }
1383 
1384 typedef struct
1385 {
1386   GOutputVector *vectors;
1387   gsize n_vectors; /* (unowned) */
1388   gsize bytes_written;
1389 } AsyncWritevAll;
1390 
1391 static void
free_async_writev_all(gpointer data)1392 free_async_writev_all (gpointer data)
1393 {
1394   g_slice_free (AsyncWritevAll, data);
1395 }
1396 
1397 static void
writev_all_callback(GObject * stream,GAsyncResult * result,gpointer user_data)1398 writev_all_callback (GObject      *stream,
1399                      GAsyncResult *result,
1400                      gpointer      user_data)
1401 {
1402   GTask *task = user_data;
1403   AsyncWritevAll *data = g_task_get_task_data (task);
1404   gint priority = g_task_get_priority (task);
1405   GCancellable *cancellable = g_task_get_cancellable (task);
1406 
1407   if (result)
1408     {
1409       GError *error = NULL;
1410       gboolean res;
1411       gsize n_written = 0;
1412 
1413       res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error);
1414 
1415       if (!res)
1416         {
1417           g_task_return_error (task, g_steal_pointer (&error));
1418           g_object_unref (task);
1419           return;
1420         }
1421 
1422       g_warn_if_fail (n_written > 0);
1423       data->bytes_written += n_written;
1424 
1425       /* skip vectors that have been written in full */
1426       while (data->n_vectors > 0 && n_written >= data->vectors[0].size)
1427         {
1428           n_written -= data->vectors[0].size;
1429           ++data->vectors;
1430           --data->n_vectors;
1431         }
1432       /* skip partially written vector data */
1433       if (n_written > 0 && data->n_vectors > 0)
1434         {
1435           data->vectors[0].size -= n_written;
1436           data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written;
1437         }
1438     }
1439 
1440   if (data->n_vectors == 0)
1441     {
1442       g_task_return_boolean (task, TRUE);
1443       g_object_unref (task);
1444     }
1445   else
1446     g_output_stream_writev_async (G_OUTPUT_STREAM (stream),
1447                                   data->vectors,
1448                                   data->n_vectors,
1449                                   priority,
1450                                   cancellable,
1451                                   writev_all_callback, g_steal_pointer (&task));
1452 }
1453 
1454 static void
writev_all_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1455 writev_all_async_thread (GTask        *task,
1456                          gpointer      source_object,
1457                          gpointer      task_data,
1458                          GCancellable *cancellable)
1459 {
1460   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1461   AsyncWritevAll *data = task_data;
1462   GError *error = NULL;
1463 
1464   if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written,
1465                                   g_task_get_cancellable (task), &error))
1466     g_task_return_boolean (task, TRUE);
1467   else
1468     g_task_return_error (task, g_steal_pointer (&error));
1469 }
1470 
1471 /**
1472  * g_output_stream_writev_all_async:
1473  * @stream: A #GOutputStream
1474  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
1475  * @n_vectors: the number of vectors to write
1476  * @io_priority: the I/O priority of the request
1477  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
1478  * @callback: (scope async): callback to call when the request is satisfied
1479  * @user_data: (closure): the data to pass to callback function
1480  *
1481  * Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
1482  * the stream. When the operation is finished @callback will be called.
1483  * You can then call g_output_stream_writev_all_finish() to get the result of the
1484  * operation.
1485  *
1486  * This is the asynchronous version of g_output_stream_writev_all().
1487  *
1488  * Call g_output_stream_writev_all_finish() to collect the result.
1489  *
1490  * Any outstanding I/O request with higher priority (lower numerical
1491  * value) will be executed before an outstanding request with lower
1492  * priority. Default priority is %G_PRIORITY_DEFAULT.
1493  *
1494  * Note that no copy of @vectors will be made, so it must stay valid
1495  * until @callback is called. The content of the individual elements
1496  * of @vectors might be changed by this function.
1497  *
1498  * Since: 2.60
1499  */
1500 void
g_output_stream_writev_all_async(GOutputStream * stream,GOutputVector * vectors,gsize n_vectors,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1501 g_output_stream_writev_all_async (GOutputStream       *stream,
1502                                   GOutputVector       *vectors,
1503                                   gsize                n_vectors,
1504                                   int                  io_priority,
1505                                   GCancellable        *cancellable,
1506                                   GAsyncReadyCallback  callback,
1507                                   gpointer             user_data)
1508 {
1509   AsyncWritevAll *data;
1510   GTask *task;
1511   gsize i, to_be_written = 0;
1512 
1513   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1514   g_return_if_fail (vectors != NULL || n_vectors == 0);
1515   g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
1516 
1517   task = g_task_new (stream, cancellable, callback, user_data);
1518   data = g_slice_new0 (AsyncWritevAll);
1519   data->vectors = vectors;
1520   data->n_vectors = n_vectors;
1521 
1522   g_task_set_source_tag (task, g_output_stream_writev_all_async);
1523   g_task_set_task_data (task, data, free_async_writev_all);
1524   g_task_set_priority (task, io_priority);
1525 
1526   /* We can't write more than G_MAXSIZE bytes overall, otherwise we
1527    * would overflow the bytes_written counter */
1528   for (i = 0; i < n_vectors; i++)
1529     {
1530        if (to_be_written > G_MAXSIZE - vectors[i].size)
1531          {
1532            g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1533                                     _("Sum of vectors passed to %s too large"),
1534                                     G_STRFUNC);
1535            g_object_unref (task);
1536            return;
1537          }
1538        to_be_written += vectors[i].size;
1539     }
1540 
1541   /* If async writes are going to be handled via the threadpool anyway
1542    * then we may as well do it with a single dispatch instead of
1543    * bouncing in and out.
1544    */
1545   if (g_output_stream_async_writev_is_via_threads (stream))
1546     {
1547       g_task_run_in_thread (task, writev_all_async_thread);
1548       g_object_unref (task);
1549     }
1550   else
1551     writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task));
1552 }
1553 
1554 /**
1555  * g_output_stream_writev_all_finish:
1556  * @stream: a #GOutputStream
1557  * @result: a #GAsyncResult
1558  * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
1559  * @error: a #GError location to store the error occurring, or %NULL to ignore.
1560  *
1561  * Finishes an asynchronous stream write operation started with
1562  * g_output_stream_writev_all_async().
1563  *
1564  * As a special exception to the normal conventions for functions that
1565  * use #GError, if this function returns %FALSE (and sets @error) then
1566  * @bytes_written will be set to the number of bytes that were
1567  * successfully written before the error was encountered.  This
1568  * functionality is only available from C.  If you need it from another
1569  * language then you must write your own loop around
1570  * g_output_stream_writev_async().
1571  *
1572  * Returns: %TRUE on success, %FALSE if there was an error
1573  *
1574  * Since: 2.60
1575  */
1576 gboolean
g_output_stream_writev_all_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)1577 g_output_stream_writev_all_finish (GOutputStream  *stream,
1578                                    GAsyncResult   *result,
1579                                    gsize          *bytes_written,
1580                                    GError        **error)
1581 {
1582   GTask *task;
1583 
1584   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1585   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1586   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
1587 
1588   task = G_TASK (result);
1589 
1590   if (bytes_written)
1591     {
1592       AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task);
1593 
1594       *bytes_written = data->bytes_written;
1595     }
1596 
1597   return g_task_propagate_boolean (task, error);
1598 }
1599 
1600 static void
write_bytes_callback(GObject * stream,GAsyncResult * result,gpointer user_data)1601 write_bytes_callback (GObject      *stream,
1602                       GAsyncResult *result,
1603                       gpointer      user_data)
1604 {
1605   GTask *task = user_data;
1606   GError *error = NULL;
1607   gssize nwrote;
1608 
1609   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
1610                                          result, &error);
1611   if (nwrote == -1)
1612     g_task_return_error (task, error);
1613   else
1614     g_task_return_int (task, nwrote);
1615   g_object_unref (task);
1616 }
1617 
1618 /**
1619  * g_output_stream_write_bytes_async:
1620  * @stream: A #GOutputStream.
1621  * @bytes: The bytes to write
1622  * @io_priority: the io priority of the request.
1623  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1624  * @callback: (scope async): callback to call when the request is satisfied
1625  * @user_data: (closure): the data to pass to callback function
1626  *
1627  * This function is similar to g_output_stream_write_async(), but
1628  * takes a #GBytes as input.  Due to the refcounted nature of #GBytes,
1629  * this allows the stream to avoid taking a copy of the data.
1630  *
1631  * However, note that this function may still perform partial writes,
1632  * just like g_output_stream_write_async(). If that occurs, to continue
1633  * writing, you will need to create a new #GBytes containing just the
1634  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
1635  * #GBytes instance multiple times potentially can result in duplicated
1636  * data in the output stream.
1637  *
1638  * For the synchronous, blocking version of this function, see
1639  * g_output_stream_write_bytes().
1640  **/
1641 void
g_output_stream_write_bytes_async(GOutputStream * stream,GBytes * bytes,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1642 g_output_stream_write_bytes_async (GOutputStream       *stream,
1643 				   GBytes              *bytes,
1644 				   int                  io_priority,
1645 				   GCancellable        *cancellable,
1646 				   GAsyncReadyCallback  callback,
1647 				   gpointer             user_data)
1648 {
1649   GTask *task;
1650   gsize size;
1651   gconstpointer data;
1652 
1653   data = g_bytes_get_data (bytes, &size);
1654 
1655   task = g_task_new (stream, cancellable, callback, user_data);
1656   g_task_set_source_tag (task, g_output_stream_write_bytes_async);
1657   g_task_set_task_data (task, g_bytes_ref (bytes),
1658                         (GDestroyNotify) g_bytes_unref);
1659 
1660   g_output_stream_write_async (stream,
1661                                data, size,
1662                                io_priority,
1663                                cancellable,
1664                                write_bytes_callback,
1665                                task);
1666 }
1667 
1668 /**
1669  * g_output_stream_write_bytes_finish:
1670  * @stream: a #GOutputStream.
1671  * @result: a #GAsyncResult.
1672  * @error: a #GError location to store the error occurring, or %NULL to
1673  * ignore.
1674  *
1675  * Finishes a stream write-from-#GBytes operation.
1676  *
1677  * Returns: a #gssize containing the number of bytes written to the stream.
1678  **/
1679 gssize
g_output_stream_write_bytes_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1680 g_output_stream_write_bytes_finish (GOutputStream  *stream,
1681 				    GAsyncResult   *result,
1682 				    GError        **error)
1683 {
1684   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
1685   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1686 
1687   return g_task_propagate_int (G_TASK (result), error);
1688 }
1689 
1690 static void
async_ready_splice_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer _data)1691 async_ready_splice_callback_wrapper (GObject      *source_object,
1692                                      GAsyncResult *res,
1693                                      gpointer     _data)
1694 {
1695   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1696   GOutputStreamClass *class;
1697   GTask *task = _data;
1698   gssize nspliced;
1699   GError *error = NULL;
1700 
1701   g_output_stream_clear_pending (stream);
1702 
1703   if (g_async_result_legacy_propagate_error (res, &error))
1704     nspliced = -1;
1705   else
1706     {
1707       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1708       nspliced = class->splice_finish (stream, res, &error);
1709     }
1710 
1711   if (nspliced >= 0)
1712     g_task_return_int (task, nspliced);
1713   else
1714     g_task_return_error (task, error);
1715   g_object_unref (task);
1716 }
1717 
1718 /**
1719  * g_output_stream_splice_async:
1720  * @stream: a #GOutputStream.
1721  * @source: a #GInputStream.
1722  * @flags: a set of #GOutputStreamSpliceFlags.
1723  * @io_priority: the io priority of the request.
1724  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1725  * @callback: (scope async): a #GAsyncReadyCallback.
1726  * @user_data: (closure): user data passed to @callback.
1727  *
1728  * Splices a stream asynchronously.
1729  * When the operation is finished @callback will be called.
1730  * You can then call g_output_stream_splice_finish() to get the
1731  * result of the operation.
1732  *
1733  * For the synchronous, blocking version of this function, see
1734  * g_output_stream_splice().
1735  **/
1736 void
g_output_stream_splice_async(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1737 g_output_stream_splice_async (GOutputStream            *stream,
1738 			      GInputStream             *source,
1739 			      GOutputStreamSpliceFlags  flags,
1740 			      int                       io_priority,
1741 			      GCancellable             *cancellable,
1742 			      GAsyncReadyCallback       callback,
1743 			      gpointer                  user_data)
1744 {
1745   GOutputStreamClass *class;
1746   GTask *task;
1747   GError *error = NULL;
1748 
1749   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1750   g_return_if_fail (G_IS_INPUT_STREAM (source));
1751 
1752   task = g_task_new (stream, cancellable, callback, user_data);
1753   g_task_set_source_tag (task, g_output_stream_splice_async);
1754   g_task_set_priority (task, io_priority);
1755   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
1756 
1757   if (g_input_stream_is_closed (source))
1758     {
1759       g_task_return_new_error (task,
1760                                G_IO_ERROR, G_IO_ERROR_CLOSED,
1761                                _("Source stream is already closed"));
1762       g_object_unref (task);
1763       return;
1764     }
1765 
1766   if (!g_output_stream_set_pending (stream, &error))
1767     {
1768       g_task_return_error (task, error);
1769       g_object_unref (task);
1770       return;
1771     }
1772 
1773   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1774 
1775   class->splice_async (stream, source, flags, io_priority, cancellable,
1776                        async_ready_splice_callback_wrapper, task);
1777 }
1778 
1779 /**
1780  * g_output_stream_splice_finish:
1781  * @stream: a #GOutputStream.
1782  * @result: a #GAsyncResult.
1783  * @error: a #GError location to store the error occurring, or %NULL to
1784  * ignore.
1785  *
1786  * Finishes an asynchronous stream splice operation.
1787  *
1788  * Returns: a #gssize of the number of bytes spliced. Note that if the
1789  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1790  *     will be returned, and there is no way to determine the actual
1791  *     number of bytes spliced.
1792  **/
1793 gssize
g_output_stream_splice_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1794 g_output_stream_splice_finish (GOutputStream  *stream,
1795 			       GAsyncResult   *result,
1796 			       GError        **error)
1797 {
1798   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1799   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1800   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
1801 
1802   /* @result is always the GTask created by g_output_stream_splice_async();
1803    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
1804    */
1805   return g_task_propagate_int (G_TASK (result), error);
1806 }
1807 
1808 static void
async_ready_flush_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1809 async_ready_flush_callback_wrapper (GObject      *source_object,
1810                                     GAsyncResult *res,
1811                                     gpointer      user_data)
1812 {
1813   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1814   GOutputStreamClass *class;
1815   GTask *task = user_data;
1816   gboolean flushed;
1817   GError *error = NULL;
1818 
1819   g_output_stream_clear_pending (stream);
1820 
1821   if (g_async_result_legacy_propagate_error (res, &error))
1822     flushed = FALSE;
1823   else
1824     {
1825       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1826       flushed = class->flush_finish (stream, res, &error);
1827     }
1828 
1829   if (flushed)
1830     g_task_return_boolean (task, TRUE);
1831   else
1832     g_task_return_error (task, error);
1833   g_object_unref (task);
1834 }
1835 
1836 /**
1837  * g_output_stream_flush_async:
1838  * @stream: a #GOutputStream.
1839  * @io_priority: the io priority of the request.
1840  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1841  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1842  * @user_data: (closure): the data to pass to callback function
1843  *
1844  * Forces an asynchronous write of all user-space buffered data for
1845  * the given @stream.
1846  * For behaviour details see g_output_stream_flush().
1847  *
1848  * When the operation is finished @callback will be
1849  * called. You can then call g_output_stream_flush_finish() to get the
1850  * result of the operation.
1851  **/
1852 void
g_output_stream_flush_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1853 g_output_stream_flush_async (GOutputStream       *stream,
1854                              int                  io_priority,
1855                              GCancellable        *cancellable,
1856                              GAsyncReadyCallback  callback,
1857                              gpointer             user_data)
1858 {
1859   GOutputStreamClass *class;
1860   GTask *task;
1861   GError *error = NULL;
1862 
1863   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1864 
1865   task = g_task_new (stream, cancellable, callback, user_data);
1866   g_task_set_source_tag (task, g_output_stream_flush_async);
1867   g_task_set_priority (task, io_priority);
1868 
1869   if (!g_output_stream_set_pending (stream, &error))
1870     {
1871       g_task_return_error (task, error);
1872       g_object_unref (task);
1873       return;
1874     }
1875 
1876   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1877 
1878   if (class->flush_async == NULL)
1879     {
1880       g_output_stream_clear_pending (stream);
1881       g_task_return_boolean (task, TRUE);
1882       g_object_unref (task);
1883       return;
1884     }
1885 
1886   class->flush_async (stream, io_priority, cancellable,
1887                       async_ready_flush_callback_wrapper, task);
1888 }
1889 
1890 /**
1891  * g_output_stream_flush_finish:
1892  * @stream: a #GOutputStream.
1893  * @result: a GAsyncResult.
1894  * @error: a #GError location to store the error occurring, or %NULL to
1895  * ignore.
1896  *
1897  * Finishes flushing an output stream.
1898  *
1899  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1900  **/
1901 gboolean
g_output_stream_flush_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1902 g_output_stream_flush_finish (GOutputStream  *stream,
1903                               GAsyncResult   *result,
1904                               GError        **error)
1905 {
1906   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1907   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1908   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1909 
1910   /* @result is always the GTask created by g_output_stream_flush_async();
1911    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1912    */
1913   return g_task_propagate_boolean (G_TASK (result), error);
1914 }
1915 
1916 
1917 static void
async_ready_close_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1918 async_ready_close_callback_wrapper (GObject      *source_object,
1919                                     GAsyncResult *res,
1920                                     gpointer      user_data)
1921 {
1922   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1923   GOutputStreamClass *class;
1924   GTask *task = user_data;
1925   GError *error = g_task_get_task_data (task);
1926 
1927   stream->priv->closing = FALSE;
1928   stream->priv->closed = TRUE;
1929 
1930   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1931     {
1932       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1933 
1934       class->close_finish (stream, res,
1935                            error ? NULL : &error);
1936     }
1937 
1938   if (error != NULL)
1939     g_task_return_error (task, error);
1940   else
1941     g_task_return_boolean (task, TRUE);
1942   g_object_unref (task);
1943 }
1944 
1945 static void
async_ready_close_flushed_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1946 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1947                                             GAsyncResult *res,
1948                                             gpointer      user_data)
1949 {
1950   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1951   GOutputStreamClass *class;
1952   GTask *task = user_data;
1953   GError *error = NULL;
1954 
1955   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1956 
1957   if (!g_async_result_legacy_propagate_error (res, &error))
1958     {
1959       class->flush_finish (stream, res, &error);
1960     }
1961 
1962   /* propagate the possible error */
1963   if (error)
1964     g_task_set_task_data (task, error, NULL);
1965 
1966   /* we still close, even if there was a flush error */
1967   class->close_async (stream,
1968                       g_task_get_priority (task),
1969                       g_task_get_cancellable (task),
1970                       async_ready_close_callback_wrapper, task);
1971 }
1972 
1973 static void
real_close_async_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1974 real_close_async_cb (GObject      *source_object,
1975                      GAsyncResult *res,
1976                      gpointer      user_data)
1977 {
1978   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1979   GTask *task = user_data;
1980   GError *error = NULL;
1981   gboolean ret;
1982 
1983   g_output_stream_clear_pending (stream);
1984 
1985   ret = g_output_stream_internal_close_finish (stream, res, &error);
1986 
1987   if (error != NULL)
1988     g_task_return_error (task, error);
1989   else
1990     g_task_return_boolean (task, ret);
1991 
1992   g_object_unref (task);
1993 }
1994 
1995 /**
1996  * g_output_stream_close_async:
1997  * @stream: A #GOutputStream.
1998  * @io_priority: the io priority of the request.
1999  * @cancellable: (nullable): optional cancellable object
2000  * @callback: (scope async): callback to call when the request is satisfied
2001  * @user_data: (closure): the data to pass to callback function
2002  *
2003  * Requests an asynchronous close of the stream, releasing resources
2004  * related to it. When the operation is finished @callback will be
2005  * called. You can then call g_output_stream_close_finish() to get
2006  * the result of the operation.
2007  *
2008  * For behaviour details see g_output_stream_close().
2009  *
2010  * The asynchronous methods have a default fallback that uses threads
2011  * to implement asynchronicity, so they are optional for inheriting
2012  * classes. However, if you override one you must override all.
2013  **/
2014 void
g_output_stream_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2015 g_output_stream_close_async (GOutputStream       *stream,
2016                              int                  io_priority,
2017                              GCancellable        *cancellable,
2018                              GAsyncReadyCallback  callback,
2019                              gpointer             user_data)
2020 {
2021   GTask *task;
2022   GError *error = NULL;
2023 
2024   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
2025 
2026   task = g_task_new (stream, cancellable, callback, user_data);
2027   g_task_set_source_tag (task, g_output_stream_close_async);
2028   g_task_set_priority (task, io_priority);
2029 
2030   if (!g_output_stream_set_pending (stream, &error))
2031     {
2032       g_task_return_error (task, error);
2033       g_object_unref (task);
2034       return;
2035     }
2036 
2037   g_output_stream_internal_close_async (stream, io_priority, cancellable,
2038                                         real_close_async_cb, task);
2039 }
2040 
2041 /* Must always be called inside
2042  * g_output_stream_set_pending()/g_output_stream_clear_pending().
2043  */
2044 void
g_output_stream_internal_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2045 g_output_stream_internal_close_async (GOutputStream       *stream,
2046                                       int                  io_priority,
2047                                       GCancellable        *cancellable,
2048                                       GAsyncReadyCallback  callback,
2049                                       gpointer             user_data)
2050 {
2051   GOutputStreamClass *class;
2052   GTask *task;
2053 
2054   task = g_task_new (stream, cancellable, callback, user_data);
2055   g_task_set_source_tag (task, g_output_stream_internal_close_async);
2056   g_task_set_priority (task, io_priority);
2057 
2058   if (stream->priv->closed)
2059     {
2060       g_task_return_boolean (task, TRUE);
2061       g_object_unref (task);
2062       return;
2063     }
2064 
2065   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2066   stream->priv->closing = TRUE;
2067 
2068   /* Call close_async directly if there is no need to flush, or if the flush
2069      can be done sync (in the output stream async close thread) */
2070   if (class->flush_async == NULL ||
2071       (class->flush_async == g_output_stream_real_flush_async &&
2072        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
2073     {
2074       class->close_async (stream, io_priority, cancellable,
2075                           async_ready_close_callback_wrapper, task);
2076     }
2077   else
2078     {
2079       /* First do an async flush, then do the async close in the callback
2080          wrapper (see async_ready_close_flushed_callback_wrapper) */
2081       class->flush_async (stream, io_priority, cancellable,
2082                           async_ready_close_flushed_callback_wrapper, task);
2083     }
2084 }
2085 
2086 static gboolean
g_output_stream_internal_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2087 g_output_stream_internal_close_finish (GOutputStream  *stream,
2088                                        GAsyncResult   *result,
2089                                        GError        **error)
2090 {
2091   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2092   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2093   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
2094 
2095   return g_task_propagate_boolean (G_TASK (result), error);
2096 }
2097 
2098 /**
2099  * g_output_stream_close_finish:
2100  * @stream: a #GOutputStream.
2101  * @result: a #GAsyncResult.
2102  * @error: a #GError location to store the error occurring, or %NULL to
2103  * ignore.
2104  *
2105  * Closes an output stream.
2106  *
2107  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
2108  **/
2109 gboolean
g_output_stream_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2110 g_output_stream_close_finish (GOutputStream  *stream,
2111                               GAsyncResult   *result,
2112                               GError        **error)
2113 {
2114   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2115   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2116   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
2117 
2118   /* @result is always the GTask created by g_output_stream_close_async();
2119    * we called class->close_finish() from async_ready_close_callback_wrapper.
2120    */
2121   return g_task_propagate_boolean (G_TASK (result), error);
2122 }
2123 
2124 /**
2125  * g_output_stream_is_closed:
2126  * @stream: a #GOutputStream.
2127  *
2128  * Checks if an output stream has already been closed.
2129  *
2130  * Returns: %TRUE if @stream is closed. %FALSE otherwise.
2131  **/
2132 gboolean
g_output_stream_is_closed(GOutputStream * stream)2133 g_output_stream_is_closed (GOutputStream *stream)
2134 {
2135   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
2136 
2137   return stream->priv->closed;
2138 }
2139 
2140 /**
2141  * g_output_stream_is_closing:
2142  * @stream: a #GOutputStream.
2143  *
2144  * Checks if an output stream is being closed. This can be
2145  * used inside e.g. a flush implementation to see if the
2146  * flush (or other i/o operation) is called from within
2147  * the closing operation.
2148  *
2149  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
2150  *
2151  * Since: 2.24
2152  **/
2153 gboolean
g_output_stream_is_closing(GOutputStream * stream)2154 g_output_stream_is_closing (GOutputStream *stream)
2155 {
2156   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
2157 
2158   return stream->priv->closing;
2159 }
2160 
2161 /**
2162  * g_output_stream_has_pending:
2163  * @stream: a #GOutputStream.
2164  *
2165  * Checks if an output stream has pending actions.
2166  *
2167  * Returns: %TRUE if @stream has pending actions.
2168  **/
2169 gboolean
g_output_stream_has_pending(GOutputStream * stream)2170 g_output_stream_has_pending (GOutputStream *stream)
2171 {
2172   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2173 
2174   return stream->priv->pending;
2175 }
2176 
2177 /**
2178  * g_output_stream_set_pending:
2179  * @stream: a #GOutputStream.
2180  * @error: a #GError location to store the error occurring, or %NULL to
2181  * ignore.
2182  *
2183  * Sets @stream to have actions pending. If the pending flag is
2184  * already set or @stream is closed, it will return %FALSE and set
2185  * @error.
2186  *
2187  * Returns: %TRUE if pending was previously unset and is now set.
2188  **/
2189 gboolean
g_output_stream_set_pending(GOutputStream * stream,GError ** error)2190 g_output_stream_set_pending (GOutputStream *stream,
2191 			     GError **error)
2192 {
2193   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2194 
2195   if (stream->priv->closed)
2196     {
2197       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
2198                            _("Stream is already closed"));
2199       return FALSE;
2200     }
2201 
2202   if (stream->priv->pending)
2203     {
2204       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
2205                            /* Translators: This is an error you get if there is
2206                             * already an operation running against this stream when
2207                             * you try to start one */
2208                            _("Stream has outstanding operation"));
2209       return FALSE;
2210     }
2211 
2212   stream->priv->pending = TRUE;
2213   return TRUE;
2214 }
2215 
2216 /**
2217  * g_output_stream_clear_pending:
2218  * @stream: output stream
2219  *
2220  * Clears the pending flag on @stream.
2221  **/
2222 void
g_output_stream_clear_pending(GOutputStream * stream)2223 g_output_stream_clear_pending (GOutputStream *stream)
2224 {
2225   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
2226 
2227   stream->priv->pending = FALSE;
2228 }
2229 
2230 /*< internal >
2231  * g_output_stream_async_write_is_via_threads:
2232  * @stream: a #GOutputStream.
2233  *
2234  * Checks if an output stream's write_async function uses threads.
2235  *
2236  * Returns: %TRUE if @stream's write_async function uses threads.
2237  **/
2238 gboolean
g_output_stream_async_write_is_via_threads(GOutputStream * stream)2239 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
2240 {
2241   GOutputStreamClass *class;
2242 
2243   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2244 
2245   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2246 
2247   return (class->write_async == g_output_stream_real_write_async &&
2248       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
2249         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
2250 }
2251 
2252 /*< internal >
2253  * g_output_stream_async_writev_is_via_threads:
2254  * @stream: a #GOutputStream.
2255  *
2256  * Checks if an output stream's writev_async function uses threads.
2257  *
2258  * Returns: %TRUE if @stream's writev_async function uses threads.
2259  **/
2260 gboolean
g_output_stream_async_writev_is_via_threads(GOutputStream * stream)2261 g_output_stream_async_writev_is_via_threads (GOutputStream *stream)
2262 {
2263   GOutputStreamClass *class;
2264 
2265   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2266 
2267   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2268 
2269   return (class->writev_async == g_output_stream_real_writev_async &&
2270       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
2271         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
2272 }
2273 
2274 /*< internal >
2275  * g_output_stream_async_close_is_via_threads:
2276  * @stream: output stream
2277  *
2278  * Checks if an output stream's close_async function uses threads.
2279  *
2280  * Returns: %TRUE if @stream's close_async function uses threads.
2281  **/
2282 gboolean
g_output_stream_async_close_is_via_threads(GOutputStream * stream)2283 g_output_stream_async_close_is_via_threads (GOutputStream *stream)
2284 {
2285   GOutputStreamClass *class;
2286 
2287   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2288 
2289   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2290 
2291   return class->close_async == g_output_stream_real_close_async;
2292 }
2293 
2294 /********************************************
2295  *   Default implementation of sync ops    *
2296  ********************************************/
2297 static gboolean
g_output_stream_real_writev(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,gsize * bytes_written,GCancellable * cancellable,GError ** error)2298 g_output_stream_real_writev (GOutputStream         *stream,
2299                              const GOutputVector   *vectors,
2300                              gsize                  n_vectors,
2301                              gsize                 *bytes_written,
2302                              GCancellable          *cancellable,
2303                              GError               **error)
2304 {
2305   GOutputStreamClass *class;
2306   gsize _bytes_written = 0;
2307   gsize i;
2308   GError *err = NULL;
2309 
2310   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2311 
2312   if (bytes_written)
2313     *bytes_written = 0;
2314 
2315   for (i = 0; i < n_vectors; i++)
2316     {
2317       gssize res = 0;
2318 
2319       /* Would we overflow here? In that case simply return and let the caller
2320        * handle this like a short write */
2321       if (_bytes_written > G_MAXSIZE - vectors[i].size)
2322         break;
2323 
2324       res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err);
2325 
2326       if (res == -1)
2327         {
2328           /* If we already wrote something  we handle this like a short write
2329            * and assume that on the next call the same error happens again, or
2330            * everything finishes successfully without data loss then
2331            */
2332           if (_bytes_written > 0)
2333             {
2334               if (bytes_written)
2335                 *bytes_written = _bytes_written;
2336 
2337               g_clear_error (&err);
2338               return TRUE;
2339             }
2340 
2341           g_propagate_error (error, err);
2342           return FALSE;
2343         }
2344 
2345       _bytes_written += res;
2346       /* if we had a short write break the loop here */
2347       if ((gsize) res < vectors[i].size)
2348         break;
2349     }
2350 
2351   if (bytes_written)
2352     *bytes_written = _bytes_written;
2353 
2354   return TRUE;
2355 }
2356 
2357 /********************************************
2358  *   Default implementation of async ops    *
2359  ********************************************/
2360 
2361 typedef struct {
2362   const void         *buffer;
2363   gsize               count_requested;
2364   gssize              count_written;
2365 } WriteData;
2366 
2367 static void
free_write_data(WriteData * op)2368 free_write_data (WriteData *op)
2369 {
2370   g_slice_free (WriteData, op);
2371 }
2372 
2373 static void
write_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2374 write_async_thread (GTask        *task,
2375                     gpointer      source_object,
2376                     gpointer      task_data,
2377                     GCancellable *cancellable)
2378 {
2379   GOutputStream *stream = source_object;
2380   WriteData *op = task_data;
2381   GOutputStreamClass *class;
2382   GError *error = NULL;
2383   gssize count_written;
2384 
2385   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2386   count_written = class->write_fn (stream, op->buffer, op->count_requested,
2387                                    cancellable, &error);
2388   if (count_written == -1)
2389     g_task_return_error (task, error);
2390   else
2391     g_task_return_int (task, count_written);
2392 }
2393 
2394 static void write_async_pollable (GPollableOutputStream *stream,
2395                                   GTask                 *task);
2396 
2397 static gboolean
write_async_pollable_ready(GPollableOutputStream * stream,gpointer user_data)2398 write_async_pollable_ready (GPollableOutputStream *stream,
2399 			    gpointer               user_data)
2400 {
2401   GTask *task = user_data;
2402 
2403   write_async_pollable (stream, task);
2404   return FALSE;
2405 }
2406 
2407 static void
write_async_pollable(GPollableOutputStream * stream,GTask * task)2408 write_async_pollable (GPollableOutputStream *stream,
2409                       GTask                 *task)
2410 {
2411   GError *error = NULL;
2412   WriteData *op = g_task_get_task_data (task);
2413   gssize count_written;
2414 
2415   if (g_task_return_error_if_cancelled (task))
2416     return;
2417 
2418   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
2419     write_nonblocking (stream, op->buffer, op->count_requested, &error);
2420 
2421   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
2422     {
2423       GSource *source;
2424 
2425       g_error_free (error);
2426 
2427       source = g_pollable_output_stream_create_source (stream,
2428                                                        g_task_get_cancellable (task));
2429       g_task_attach_source (task, source,
2430                             (GSourceFunc) write_async_pollable_ready);
2431       g_source_unref (source);
2432       return;
2433     }
2434 
2435   if (count_written == -1)
2436     g_task_return_error (task, error);
2437   else
2438     g_task_return_int (task, count_written);
2439 }
2440 
2441 static void
g_output_stream_real_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2442 g_output_stream_real_write_async (GOutputStream       *stream,
2443                                   const void          *buffer,
2444                                   gsize                count,
2445                                   int                  io_priority,
2446                                   GCancellable        *cancellable,
2447                                   GAsyncReadyCallback  callback,
2448                                   gpointer             user_data)
2449 {
2450   GTask *task;
2451   WriteData *op;
2452 
2453   op = g_slice_new0 (WriteData);
2454   task = g_task_new (stream, cancellable, callback, user_data);
2455   g_task_set_check_cancellable (task, FALSE);
2456   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
2457   op->buffer = buffer;
2458   op->count_requested = count;
2459 
2460   if (!g_output_stream_async_write_is_via_threads (stream))
2461     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
2462   else
2463     g_task_run_in_thread (task, write_async_thread);
2464   g_object_unref (task);
2465 }
2466 
2467 static gssize
g_output_stream_real_write_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2468 g_output_stream_real_write_finish (GOutputStream  *stream,
2469                                    GAsyncResult   *result,
2470                                    GError        **error)
2471 {
2472   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2473 
2474   return g_task_propagate_int (G_TASK (result), error);
2475 }
2476 
2477 typedef struct {
2478   const GOutputVector *vectors;
2479   gsize                n_vectors; /* (unowned) */
2480   gsize                bytes_written;
2481 } WritevData;
2482 
2483 static void
free_writev_data(WritevData * op)2484 free_writev_data (WritevData *op)
2485 {
2486   g_slice_free (WritevData, op);
2487 }
2488 
2489 static void
writev_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2490 writev_async_thread (GTask        *task,
2491                      gpointer      source_object,
2492                      gpointer      task_data,
2493                      GCancellable *cancellable)
2494 {
2495   GOutputStream *stream = source_object;
2496   WritevData *op = task_data;
2497   GOutputStreamClass *class;
2498   GError *error = NULL;
2499   gboolean res;
2500 
2501   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2502   res = class->writev_fn (stream, op->vectors, op->n_vectors,
2503                           &op->bytes_written, cancellable, &error);
2504 
2505   g_warn_if_fail (res || op->bytes_written == 0);
2506   g_warn_if_fail (res || error != NULL);
2507 
2508   if (!res)
2509     g_task_return_error (task, g_steal_pointer (&error));
2510   else
2511     g_task_return_boolean (task, TRUE);
2512 }
2513 
2514 static void writev_async_pollable (GPollableOutputStream *stream,
2515                                    GTask                 *task);
2516 
2517 static gboolean
writev_async_pollable_ready(GPollableOutputStream * stream,gpointer user_data)2518 writev_async_pollable_ready (GPollableOutputStream *stream,
2519 			     gpointer               user_data)
2520 {
2521   GTask *task = user_data;
2522 
2523   writev_async_pollable (stream, task);
2524   return G_SOURCE_REMOVE;
2525 }
2526 
2527 static void
writev_async_pollable(GPollableOutputStream * stream,GTask * task)2528 writev_async_pollable (GPollableOutputStream *stream,
2529                        GTask                 *task)
2530 {
2531   GError *error = NULL;
2532   WritevData *op = g_task_get_task_data (task);
2533   GPollableReturn res;
2534   gsize bytes_written = 0;
2535 
2536   if (g_task_return_error_if_cancelled (task))
2537     return;
2538 
2539   res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
2540     writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error);
2541 
2542   switch (res)
2543     {
2544     case G_POLLABLE_RETURN_WOULD_BLOCK:
2545         {
2546           GSource *source;
2547 
2548           g_warn_if_fail (error == NULL);
2549           g_warn_if_fail (bytes_written == 0);
2550 
2551           source = g_pollable_output_stream_create_source (stream,
2552                                                            g_task_get_cancellable (task));
2553           g_task_attach_source (task, source,
2554                                 (GSourceFunc) writev_async_pollable_ready);
2555           g_source_unref (source);
2556         }
2557         break;
2558       case G_POLLABLE_RETURN_OK:
2559         g_warn_if_fail (error == NULL);
2560         op->bytes_written = bytes_written;
2561         g_task_return_boolean (task, TRUE);
2562         break;
2563       case G_POLLABLE_RETURN_FAILED:
2564         g_warn_if_fail (bytes_written == 0);
2565         g_warn_if_fail (error != NULL);
2566         g_task_return_error (task, g_steal_pointer (&error));
2567         break;
2568       default:
2569         g_assert_not_reached ();
2570     }
2571 }
2572 
2573 static void
g_output_stream_real_writev_async(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2574 g_output_stream_real_writev_async (GOutputStream        *stream,
2575                                    const GOutputVector  *vectors,
2576                                    gsize                 n_vectors,
2577                                    int                   io_priority,
2578                                    GCancellable         *cancellable,
2579                                    GAsyncReadyCallback   callback,
2580                                    gpointer              user_data)
2581 {
2582   GTask *task;
2583   WritevData *op;
2584   GError *error = NULL;
2585 
2586   op = g_slice_new0 (WritevData);
2587   task = g_task_new (stream, cancellable, callback, user_data);
2588   op->vectors = vectors;
2589   op->n_vectors = n_vectors;
2590 
2591   g_task_set_check_cancellable (task, FALSE);
2592   g_task_set_source_tag (task, g_output_stream_writev_async);
2593   g_task_set_priority (task, io_priority);
2594   g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data);
2595 
2596   if (n_vectors == 0)
2597     {
2598       g_task_return_boolean (task, TRUE);
2599       g_object_unref (task);
2600       return;
2601     }
2602 
2603   if (!g_output_stream_set_pending (stream, &error))
2604     {
2605       g_task_return_error (task, g_steal_pointer (&error));
2606       g_object_unref (task);
2607       return;
2608     }
2609 
2610   if (!g_output_stream_async_writev_is_via_threads (stream))
2611     writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
2612   else
2613     g_task_run_in_thread (task, writev_async_thread);
2614 
2615   g_object_unref (task);
2616 }
2617 
2618 static gboolean
g_output_stream_real_writev_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)2619 g_output_stream_real_writev_finish (GOutputStream   *stream,
2620                                     GAsyncResult    *result,
2621                                     gsize           *bytes_written,
2622                                     GError         **error)
2623 {
2624   GTask *task;
2625 
2626   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2627   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE);
2628 
2629   g_output_stream_clear_pending (stream);
2630 
2631   task = G_TASK (result);
2632 
2633   if (bytes_written)
2634     {
2635       WritevData *op = g_task_get_task_data (task);
2636 
2637       *bytes_written = op->bytes_written;
2638     }
2639 
2640   return g_task_propagate_boolean (task, error);
2641 }
2642 
2643 typedef struct {
2644   GInputStream *source;
2645   GOutputStreamSpliceFlags flags;
2646   guint istream_closed : 1;
2647   guint ostream_closed : 1;
2648   gssize n_read;
2649   gssize n_written;
2650   gsize bytes_copied;
2651   GError *error;
2652   guint8 *buffer;
2653 } SpliceData;
2654 
2655 static void
free_splice_data(SpliceData * op)2656 free_splice_data (SpliceData *op)
2657 {
2658   g_clear_pointer (&op->buffer, g_free);
2659   g_object_unref (op->source);
2660   g_clear_error (&op->error);
2661   g_free (op);
2662 }
2663 
2664 static void
real_splice_async_complete_cb(GTask * task)2665 real_splice_async_complete_cb (GTask *task)
2666 {
2667   SpliceData *op = g_task_get_task_data (task);
2668 
2669   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
2670       !op->istream_closed)
2671     return;
2672 
2673   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
2674       !op->ostream_closed)
2675     return;
2676 
2677   if (op->error != NULL)
2678     {
2679       g_task_return_error (task, op->error);
2680       op->error = NULL;
2681     }
2682   else
2683     {
2684       g_task_return_int (task, op->bytes_copied);
2685     }
2686 
2687   g_object_unref (task);
2688 }
2689 
2690 static void
real_splice_async_close_input_cb(GObject * source,GAsyncResult * res,gpointer user_data)2691 real_splice_async_close_input_cb (GObject      *source,
2692                                   GAsyncResult *res,
2693                                   gpointer      user_data)
2694 {
2695   GTask *task = user_data;
2696   SpliceData *op = g_task_get_task_data (task);
2697 
2698   g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
2699   op->istream_closed = TRUE;
2700 
2701   real_splice_async_complete_cb (task);
2702 }
2703 
2704 static void
real_splice_async_close_output_cb(GObject * source,GAsyncResult * res,gpointer user_data)2705 real_splice_async_close_output_cb (GObject      *source,
2706                                    GAsyncResult *res,
2707                                    gpointer      user_data)
2708 {
2709   GTask *task = G_TASK (user_data);
2710   SpliceData *op = g_task_get_task_data (task);
2711   GError **error = (op->error == NULL) ? &op->error : NULL;
2712 
2713   g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
2714   op->ostream_closed = TRUE;
2715 
2716   real_splice_async_complete_cb (task);
2717 }
2718 
2719 static void
real_splice_async_complete(GTask * task)2720 real_splice_async_complete (GTask *task)
2721 {
2722   SpliceData *op = g_task_get_task_data (task);
2723   gboolean done = TRUE;
2724 
2725   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
2726     {
2727       done = FALSE;
2728       g_input_stream_close_async (op->source, g_task_get_priority (task),
2729                                   g_task_get_cancellable (task),
2730                                   real_splice_async_close_input_cb, task);
2731     }
2732 
2733   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
2734     {
2735       done = FALSE;
2736       g_output_stream_internal_close_async (g_task_get_source_object (task),
2737                                             g_task_get_priority (task),
2738                                             g_task_get_cancellable (task),
2739                                             real_splice_async_close_output_cb,
2740                                             task);
2741     }
2742 
2743   if (done)
2744     real_splice_async_complete_cb (task);
2745 }
2746 
2747 static void real_splice_async_read_cb (GObject      *source,
2748                                        GAsyncResult *res,
2749                                        gpointer      user_data);
2750 
2751 static void
real_splice_async_write_cb(GObject * source,GAsyncResult * res,gpointer user_data)2752 real_splice_async_write_cb (GObject      *source,
2753                             GAsyncResult *res,
2754                             gpointer      user_data)
2755 {
2756   GOutputStreamClass *class;
2757   GTask *task = G_TASK (user_data);
2758   SpliceData *op = g_task_get_task_data (task);
2759   gssize ret;
2760 
2761   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
2762 
2763   ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
2764 
2765   if (ret == -1)
2766     {
2767       real_splice_async_complete (task);
2768       return;
2769     }
2770 
2771   op->n_written += ret;
2772   op->bytes_copied += ret;
2773   if (op->bytes_copied > G_MAXSSIZE)
2774     op->bytes_copied = G_MAXSSIZE;
2775 
2776   if (op->n_written < op->n_read)
2777     {
2778       class->write_async (g_task_get_source_object (task),
2779                           op->buffer + op->n_written,
2780                           op->n_read - op->n_written,
2781                           g_task_get_priority (task),
2782                           g_task_get_cancellable (task),
2783                           real_splice_async_write_cb, task);
2784       return;
2785     }
2786 
2787   g_input_stream_read_async (op->source, op->buffer, 8192,
2788                              g_task_get_priority (task),
2789                              g_task_get_cancellable (task),
2790                              real_splice_async_read_cb, task);
2791 }
2792 
2793 static void
real_splice_async_read_cb(GObject * source,GAsyncResult * res,gpointer user_data)2794 real_splice_async_read_cb (GObject      *source,
2795                            GAsyncResult *res,
2796                            gpointer      user_data)
2797 {
2798   GOutputStreamClass *class;
2799   GTask *task = G_TASK (user_data);
2800   SpliceData *op = g_task_get_task_data (task);
2801   gssize ret;
2802 
2803   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
2804 
2805   ret = g_input_stream_read_finish (op->source, res, &op->error);
2806   if (ret == -1 || ret == 0)
2807     {
2808       real_splice_async_complete (task);
2809       return;
2810     }
2811 
2812   op->n_read = ret;
2813   op->n_written = 0;
2814 
2815   class->write_async (g_task_get_source_object (task), op->buffer,
2816                       op->n_read, g_task_get_priority (task),
2817                       g_task_get_cancellable (task),
2818                       real_splice_async_write_cb, task);
2819 }
2820 
2821 static void
splice_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2822 splice_async_thread (GTask        *task,
2823                      gpointer      source_object,
2824                      gpointer      task_data,
2825                      GCancellable *cancellable)
2826 {
2827   GOutputStream *stream = source_object;
2828   SpliceData *op = task_data;
2829   GOutputStreamClass *class;
2830   GError *error = NULL;
2831   gssize bytes_copied;
2832 
2833   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2834 
2835   bytes_copied = class->splice (stream,
2836                                 op->source,
2837                                 op->flags,
2838                                 cancellable,
2839                                 &error);
2840   if (bytes_copied == -1)
2841     g_task_return_error (task, error);
2842   else
2843     g_task_return_int (task, bytes_copied);
2844 }
2845 
2846 static void
g_output_stream_real_splice_async(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2847 g_output_stream_real_splice_async (GOutputStream             *stream,
2848                                    GInputStream              *source,
2849                                    GOutputStreamSpliceFlags   flags,
2850                                    int                        io_priority,
2851                                    GCancellable              *cancellable,
2852                                    GAsyncReadyCallback        callback,
2853                                    gpointer                   user_data)
2854 {
2855   GTask *task;
2856   SpliceData *op;
2857 
2858   op = g_new0 (SpliceData, 1);
2859   task = g_task_new (stream, cancellable, callback, user_data);
2860   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
2861   op->flags = flags;
2862   op->source = g_object_ref (source);
2863 
2864   if (g_input_stream_async_read_is_via_threads (source) &&
2865       g_output_stream_async_write_is_via_threads (stream))
2866     {
2867       g_task_run_in_thread (task, splice_async_thread);
2868       g_object_unref (task);
2869     }
2870   else
2871     {
2872       op->buffer = g_malloc (8192);
2873       g_input_stream_read_async (op->source, op->buffer, 8192,
2874                                  g_task_get_priority (task),
2875                                  g_task_get_cancellable (task),
2876                                  real_splice_async_read_cb, task);
2877     }
2878 }
2879 
2880 static gssize
g_output_stream_real_splice_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2881 g_output_stream_real_splice_finish (GOutputStream  *stream,
2882                                     GAsyncResult   *result,
2883                                     GError        **error)
2884 {
2885   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2886 
2887   return g_task_propagate_int (G_TASK (result), error);
2888 }
2889 
2890 
2891 static void
flush_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2892 flush_async_thread (GTask        *task,
2893                     gpointer      source_object,
2894                     gpointer      task_data,
2895                     GCancellable *cancellable)
2896 {
2897   GOutputStream *stream = source_object;
2898   GOutputStreamClass *class;
2899   gboolean result;
2900   GError *error = NULL;
2901 
2902   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2903   result = TRUE;
2904   if (class->flush)
2905     result = class->flush (stream, cancellable, &error);
2906 
2907   if (result)
2908     g_task_return_boolean (task, TRUE);
2909   else
2910     g_task_return_error (task, error);
2911 }
2912 
2913 static void
g_output_stream_real_flush_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2914 g_output_stream_real_flush_async (GOutputStream       *stream,
2915                                   int                  io_priority,
2916                                   GCancellable        *cancellable,
2917                                   GAsyncReadyCallback  callback,
2918                                   gpointer             user_data)
2919 {
2920   GTask *task;
2921 
2922   task = g_task_new (stream, cancellable, callback, user_data);
2923   g_task_set_priority (task, io_priority);
2924   g_task_run_in_thread (task, flush_async_thread);
2925   g_object_unref (task);
2926 }
2927 
2928 static gboolean
g_output_stream_real_flush_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2929 g_output_stream_real_flush_finish (GOutputStream  *stream,
2930                                    GAsyncResult   *result,
2931                                    GError        **error)
2932 {
2933   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2934 
2935   return g_task_propagate_boolean (G_TASK (result), error);
2936 }
2937 
2938 static void
close_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2939 close_async_thread (GTask        *task,
2940                     gpointer      source_object,
2941                     gpointer      task_data,
2942                     GCancellable *cancellable)
2943 {
2944   GOutputStream *stream = source_object;
2945   GOutputStreamClass *class;
2946   GError *error = NULL;
2947   gboolean result = TRUE;
2948 
2949   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2950 
2951   /* Do a flush here if there is a flush function, and we did not have to do
2952    * an async flush before (see g_output_stream_close_async)
2953    */
2954   if (class->flush != NULL &&
2955       (class->flush_async == NULL ||
2956        class->flush_async == g_output_stream_real_flush_async))
2957     {
2958       result = class->flush (stream, cancellable, &error);
2959     }
2960 
2961   /* Auto handling of cancellation disabled, and ignore
2962      cancellation, since we want to close things anyway, although
2963      possibly in a quick-n-dirty way. At least we never want to leak
2964      open handles */
2965 
2966   if (class->close_fn)
2967     {
2968       /* Make sure to close, even if the flush failed (see sync close) */
2969       if (!result)
2970         class->close_fn (stream, cancellable, NULL);
2971       else
2972         result = class->close_fn (stream, cancellable, &error);
2973     }
2974 
2975   if (result)
2976     g_task_return_boolean (task, TRUE);
2977   else
2978     g_task_return_error (task, error);
2979 }
2980 
2981 static void
g_output_stream_real_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2982 g_output_stream_real_close_async (GOutputStream       *stream,
2983                                   int                  io_priority,
2984                                   GCancellable        *cancellable,
2985                                   GAsyncReadyCallback  callback,
2986                                   gpointer             user_data)
2987 {
2988   GTask *task;
2989 
2990   task = g_task_new (stream, cancellable, callback, user_data);
2991   g_task_set_priority (task, io_priority);
2992   g_task_run_in_thread (task, close_async_thread);
2993   g_object_unref (task);
2994 }
2995 
2996 static gboolean
g_output_stream_real_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2997 g_output_stream_real_close_finish (GOutputStream  *stream,
2998                                    GAsyncResult   *result,
2999                                    GError        **error)
3000 {
3001   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
3002 
3003   return g_task_propagate_boolean (G_TASK (result), error);
3004 }
3005