1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 *
18 * Author: Alexander Larsson <alexl@redhat.com>
19 */
20
21 #include "config.h"
22 #include <string.h>
23 #include "goutputstream.h"
24 #include "gcancellable.h"
25 #include "gasyncresult.h"
26 #include "gtask.h"
27 #include "ginputstream.h"
28 #include "gioerror.h"
29 #include "gioprivate.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34 * SECTION:goutputstream
35 * @short_description: Base class for implementing streaming output
36 * @include: gio/gio.h
37 *
38 * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39 * to close a stream (g_output_stream_close()) and to flush pending writes
40 * (g_output_stream_flush()).
41 *
42 * To copy the content of an input stream to an output stream without
43 * manually handling the reads and writes, use g_output_stream_splice().
44 *
45 * See the documentation for #GIOStream for details of thread safety of
46 * streaming APIs.
47 *
48 * All of these functions have async variants too.
49 **/
50
51 struct _GOutputStreamPrivate {
52 guint closed : 1;
53 guint pending : 1;
54 guint closing : 1;
55 GAsyncReadyCallback outstanding_callback;
56 };
57
58 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
59
60 static gssize g_output_stream_real_splice (GOutputStream *stream,
61 GInputStream *source,
62 GOutputStreamSpliceFlags flags,
63 GCancellable *cancellable,
64 GError **error);
65 static void g_output_stream_real_write_async (GOutputStream *stream,
66 const void *buffer,
67 gsize count,
68 int io_priority,
69 GCancellable *cancellable,
70 GAsyncReadyCallback callback,
71 gpointer data);
72 static gssize g_output_stream_real_write_finish (GOutputStream *stream,
73 GAsyncResult *result,
74 GError **error);
75 static gboolean g_output_stream_real_writev (GOutputStream *stream,
76 const GOutputVector *vectors,
77 gsize n_vectors,
78 gsize *bytes_written,
79 GCancellable *cancellable,
80 GError **error);
81 static void g_output_stream_real_writev_async (GOutputStream *stream,
82 const GOutputVector *vectors,
83 gsize n_vectors,
84 int io_priority,
85 GCancellable *cancellable,
86 GAsyncReadyCallback callback,
87 gpointer data);
88 static gboolean g_output_stream_real_writev_finish (GOutputStream *stream,
89 GAsyncResult *result,
90 gsize *bytes_written,
91 GError **error);
92 static void g_output_stream_real_splice_async (GOutputStream *stream,
93 GInputStream *source,
94 GOutputStreamSpliceFlags flags,
95 int io_priority,
96 GCancellable *cancellable,
97 GAsyncReadyCallback callback,
98 gpointer data);
99 static gssize g_output_stream_real_splice_finish (GOutputStream *stream,
100 GAsyncResult *result,
101 GError **error);
102 static void g_output_stream_real_flush_async (GOutputStream *stream,
103 int io_priority,
104 GCancellable *cancellable,
105 GAsyncReadyCallback callback,
106 gpointer data);
107 static gboolean g_output_stream_real_flush_finish (GOutputStream *stream,
108 GAsyncResult *result,
109 GError **error);
110 static void g_output_stream_real_close_async (GOutputStream *stream,
111 int io_priority,
112 GCancellable *cancellable,
113 GAsyncReadyCallback callback,
114 gpointer data);
115 static gboolean g_output_stream_real_close_finish (GOutputStream *stream,
116 GAsyncResult *result,
117 GError **error);
118 static gboolean g_output_stream_internal_close (GOutputStream *stream,
119 GCancellable *cancellable,
120 GError **error);
121 static void g_output_stream_internal_close_async (GOutputStream *stream,
122 int io_priority,
123 GCancellable *cancellable,
124 GAsyncReadyCallback callback,
125 gpointer data);
126 static gboolean g_output_stream_internal_close_finish (GOutputStream *stream,
127 GAsyncResult *result,
128 GError **error);
129
130 static void
g_output_stream_dispose(GObject * object)131 g_output_stream_dispose (GObject *object)
132 {
133 GOutputStream *stream;
134
135 stream = G_OUTPUT_STREAM (object);
136
137 if (!stream->priv->closed)
138 g_output_stream_close (stream, NULL, NULL);
139
140 G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
141 }
142
143 static void
g_output_stream_class_init(GOutputStreamClass * klass)144 g_output_stream_class_init (GOutputStreamClass *klass)
145 {
146 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
147
148 gobject_class->dispose = g_output_stream_dispose;
149
150 klass->splice = g_output_stream_real_splice;
151
152 klass->write_async = g_output_stream_real_write_async;
153 klass->write_finish = g_output_stream_real_write_finish;
154 klass->writev_fn = g_output_stream_real_writev;
155 klass->writev_async = g_output_stream_real_writev_async;
156 klass->writev_finish = g_output_stream_real_writev_finish;
157 klass->splice_async = g_output_stream_real_splice_async;
158 klass->splice_finish = g_output_stream_real_splice_finish;
159 klass->flush_async = g_output_stream_real_flush_async;
160 klass->flush_finish = g_output_stream_real_flush_finish;
161 klass->close_async = g_output_stream_real_close_async;
162 klass->close_finish = g_output_stream_real_close_finish;
163 }
164
165 static void
g_output_stream_init(GOutputStream * stream)166 g_output_stream_init (GOutputStream *stream)
167 {
168 stream->priv = g_output_stream_get_instance_private (stream);
169 }
170
171 /**
172 * g_output_stream_write:
173 * @stream: a #GOutputStream.
174 * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
175 * @count: the number of bytes to write
176 * @cancellable: (nullable): optional cancellable object
177 * @error: location to store the error occurring, or %NULL to ignore
178 *
179 * Tries to write @count bytes from @buffer into the stream. Will block
180 * during the operation.
181 *
182 * If count is 0, returns 0 and does nothing. A value of @count
183 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
184 *
185 * On success, the number of bytes written to the stream is returned.
186 * It is not an error if this is not the same as the requested size, as it
187 * can happen e.g. on a partial I/O error, or if there is not enough
188 * storage in the stream. All writes block until at least one byte
189 * is written or an error occurs; 0 is never returned (unless
190 * @count is 0).
191 *
192 * If @cancellable is not %NULL, then the operation can be cancelled by
193 * triggering the cancellable object from another thread. If the operation
194 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
195 * operation was partially finished when the operation was cancelled the
196 * partial result will be returned, without an error.
197 *
198 * On error -1 is returned and @error is set accordingly.
199 *
200 * Virtual: write_fn
201 *
202 * Returns: Number of bytes written, or -1 on error
203 **/
204 gssize
g_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)205 g_output_stream_write (GOutputStream *stream,
206 const void *buffer,
207 gsize count,
208 GCancellable *cancellable,
209 GError **error)
210 {
211 GOutputStreamClass *class;
212 gssize res;
213
214 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
215 g_return_val_if_fail (buffer != NULL, 0);
216
217 if (count == 0)
218 return 0;
219
220 if (((gssize) count) < 0)
221 {
222 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
223 _("Too large count value passed to %s"), G_STRFUNC);
224 return -1;
225 }
226
227 class = G_OUTPUT_STREAM_GET_CLASS (stream);
228
229 if (class->write_fn == NULL)
230 {
231 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
232 _("Output stream doesn’t implement write"));
233 return -1;
234 }
235
236 if (!g_output_stream_set_pending (stream, error))
237 return -1;
238
239 if (cancellable)
240 g_cancellable_push_current (cancellable);
241
242 res = class->write_fn (stream, buffer, count, cancellable, error);
243
244 if (cancellable)
245 g_cancellable_pop_current (cancellable);
246
247 g_output_stream_clear_pending (stream);
248
249 return res;
250 }
251
252 /**
253 * g_output_stream_write_all:
254 * @stream: a #GOutputStream.
255 * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
256 * @count: the number of bytes to write
257 * @bytes_written: (out) (optional): location to store the number of bytes that was
258 * written to the stream
259 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
260 * @error: location to store the error occurring, or %NULL to ignore
261 *
262 * Tries to write @count bytes from @buffer into the stream. Will block
263 * during the operation.
264 *
265 * This function is similar to g_output_stream_write(), except it tries to
266 * write as many bytes as requested, only stopping on an error.
267 *
268 * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
269 * is set to @count.
270 *
271 * If there is an error during the operation %FALSE is returned and @error
272 * is set to indicate the error status.
273 *
274 * As a special exception to the normal conventions for functions that
275 * use #GError, if this function returns %FALSE (and sets @error) then
276 * @bytes_written will be set to the number of bytes that were
277 * successfully written before the error was encountered. This
278 * functionality is only available from C. If you need it from another
279 * language then you must write your own loop around
280 * g_output_stream_write().
281 *
282 * Returns: %TRUE on success, %FALSE if there was an error
283 **/
284 gboolean
g_output_stream_write_all(GOutputStream * stream,const void * buffer,gsize count,gsize * bytes_written,GCancellable * cancellable,GError ** error)285 g_output_stream_write_all (GOutputStream *stream,
286 const void *buffer,
287 gsize count,
288 gsize *bytes_written,
289 GCancellable *cancellable,
290 GError **error)
291 {
292 gsize _bytes_written;
293 gssize res;
294
295 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
296 g_return_val_if_fail (buffer != NULL, FALSE);
297
298 _bytes_written = 0;
299 while (_bytes_written < count)
300 {
301 res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
302 cancellable, error);
303 if (res == -1)
304 {
305 if (bytes_written)
306 *bytes_written = _bytes_written;
307 return FALSE;
308 }
309 g_return_val_if_fail (res > 0, FALSE);
310
311 _bytes_written += res;
312 }
313
314 if (bytes_written)
315 *bytes_written = _bytes_written;
316
317 return TRUE;
318 }
319
320 /**
321 * g_output_stream_writev:
322 * @stream: a #GOutputStream.
323 * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
324 * @n_vectors: the number of vectors to write
325 * @bytes_written: (out) (optional): location to store the number of bytes that were
326 * written to the stream
327 * @cancellable: (nullable): optional cancellable object
328 * @error: location to store the error occurring, or %NULL to ignore
329 *
330 * Tries to write the bytes contained in the @n_vectors @vectors into the
331 * stream. Will block during the operation.
332 *
333 * If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
334 * does nothing.
335 *
336 * On success, the number of bytes written to the stream is returned.
337 * It is not an error if this is not the same as the requested size, as it
338 * can happen e.g. on a partial I/O error, or if there is not enough
339 * storage in the stream. All writes block until at least one byte
340 * is written or an error occurs; 0 is never returned (unless
341 * @n_vectors is 0 or the sum of all bytes in @vectors is 0).
342 *
343 * If @cancellable is not %NULL, then the operation can be cancelled by
344 * triggering the cancellable object from another thread. If the operation
345 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
346 * operation was partially finished when the operation was cancelled the
347 * partial result will be returned, without an error.
348 *
349 * Some implementations of g_output_stream_writev() may have limitations on the
350 * aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these
351 * are exceeded. For example, when writing to a local file on UNIX platforms,
352 * the aggregate buffer size must not exceed %G_MAXSSIZE bytes.
353 *
354 * Virtual: writev_fn
355 *
356 * Returns: %TRUE on success, %FALSE if there was an error
357 *
358 * Since: 2.60
359 */
360 gboolean
g_output_stream_writev(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,gsize * bytes_written,GCancellable * cancellable,GError ** error)361 g_output_stream_writev (GOutputStream *stream,
362 const GOutputVector *vectors,
363 gsize n_vectors,
364 gsize *bytes_written,
365 GCancellable *cancellable,
366 GError **error)
367 {
368 GOutputStreamClass *class;
369 gboolean res;
370 gsize _bytes_written = 0;
371
372 if (bytes_written)
373 *bytes_written = 0;
374
375 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
376 g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
377 g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
378 g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
379
380 if (n_vectors == 0)
381 return TRUE;
382
383 class = G_OUTPUT_STREAM_GET_CLASS (stream);
384
385 g_return_val_if_fail (class->writev_fn != NULL, FALSE);
386
387 if (!g_output_stream_set_pending (stream, error))
388 return FALSE;
389
390 if (cancellable)
391 g_cancellable_push_current (cancellable);
392
393 res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error);
394
395 g_warn_if_fail (res || _bytes_written == 0);
396 g_warn_if_fail (res || (error == NULL || *error != NULL));
397
398 if (cancellable)
399 g_cancellable_pop_current (cancellable);
400
401 g_output_stream_clear_pending (stream);
402
403 if (bytes_written)
404 *bytes_written = _bytes_written;
405
406 return res;
407 }
408
409 /**
410 * g_output_stream_writev_all:
411 * @stream: a #GOutputStream.
412 * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
413 * @n_vectors: the number of vectors to write
414 * @bytes_written: (out) (optional): location to store the number of bytes that were
415 * written to the stream
416 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
417 * @error: location to store the error occurring, or %NULL to ignore
418 *
419 * Tries to write the bytes contained in the @n_vectors @vectors into the
420 * stream. Will block during the operation.
421 *
422 * This function is similar to g_output_stream_writev(), except it tries to
423 * write as many bytes as requested, only stopping on an error.
424 *
425 * On a successful write of all @n_vectors vectors, %TRUE is returned, and
426 * @bytes_written is set to the sum of all the sizes of @vectors.
427 *
428 * If there is an error during the operation %FALSE is returned and @error
429 * is set to indicate the error status.
430 *
431 * As a special exception to the normal conventions for functions that
432 * use #GError, if this function returns %FALSE (and sets @error) then
433 * @bytes_written will be set to the number of bytes that were
434 * successfully written before the error was encountered. This
435 * functionality is only available from C. If you need it from another
436 * language then you must write your own loop around
437 * g_output_stream_write().
438 *
439 * The content of the individual elements of @vectors might be changed by this
440 * function.
441 *
442 * Returns: %TRUE on success, %FALSE if there was an error
443 *
444 * Since: 2.60
445 */
446 gboolean
g_output_stream_writev_all(GOutputStream * stream,GOutputVector * vectors,gsize n_vectors,gsize * bytes_written,GCancellable * cancellable,GError ** error)447 g_output_stream_writev_all (GOutputStream *stream,
448 GOutputVector *vectors,
449 gsize n_vectors,
450 gsize *bytes_written,
451 GCancellable *cancellable,
452 GError **error)
453 {
454 gsize _bytes_written = 0;
455 gsize i, to_be_written = 0;
456
457 if (bytes_written)
458 *bytes_written = 0;
459
460 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
461 g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
462 g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
463 g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
464
465 /* We can't write more than G_MAXSIZE bytes overall, otherwise we
466 * would overflow the bytes_written counter */
467 for (i = 0; i < n_vectors; i++)
468 {
469 if (to_be_written > G_MAXSIZE - vectors[i].size)
470 {
471 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
472 _("Sum of vectors passed to %s too large"), G_STRFUNC);
473 return FALSE;
474 }
475 to_be_written += vectors[i].size;
476 }
477
478 _bytes_written = 0;
479 while (n_vectors > 0 && to_be_written > 0)
480 {
481 gsize n_written = 0;
482 gboolean res;
483
484 res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error);
485
486 if (!res)
487 {
488 if (bytes_written)
489 *bytes_written = _bytes_written;
490 return FALSE;
491 }
492
493 g_return_val_if_fail (n_written > 0, FALSE);
494 _bytes_written += n_written;
495
496 /* skip vectors that have been written in full */
497 while (n_vectors > 0 && n_written >= vectors[0].size)
498 {
499 n_written -= vectors[0].size;
500 ++vectors;
501 --n_vectors;
502 }
503 /* skip partially written vector data */
504 if (n_written > 0 && n_vectors > 0)
505 {
506 vectors[0].size -= n_written;
507 vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written;
508 }
509 }
510
511 if (bytes_written)
512 *bytes_written = _bytes_written;
513
514 return TRUE;
515 }
516
517 /**
518 * g_output_stream_printf:
519 * @stream: a #GOutputStream.
520 * @bytes_written: (out) (optional): location to store the number of bytes that was
521 * written to the stream
522 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
523 * @error: location to store the error occurring, or %NULL to ignore
524 * @format: the format string. See the printf() documentation
525 * @...: the parameters to insert into the format string
526 *
527 * This is a utility function around g_output_stream_write_all(). It
528 * uses g_strdup_vprintf() to turn @format and @... into a string that
529 * is then written to @stream.
530 *
531 * See the documentation of g_output_stream_write_all() about the
532 * behavior of the actual write operation.
533 *
534 * Note that partial writes cannot be properly checked with this
535 * function due to the variable length of the written string, if you
536 * need precise control over partial write failures, you need to
537 * create you own printf()-like wrapper around g_output_stream_write()
538 * or g_output_stream_write_all().
539 *
540 * Since: 2.40
541 *
542 * Returns: %TRUE on success, %FALSE if there was an error
543 **/
544 gboolean
g_output_stream_printf(GOutputStream * stream,gsize * bytes_written,GCancellable * cancellable,GError ** error,const gchar * format,...)545 g_output_stream_printf (GOutputStream *stream,
546 gsize *bytes_written,
547 GCancellable *cancellable,
548 GError **error,
549 const gchar *format,
550 ...)
551 {
552 va_list args;
553 gboolean success;
554
555 va_start (args, format);
556 success = g_output_stream_vprintf (stream, bytes_written, cancellable,
557 error, format, args);
558 va_end (args);
559
560 return success;
561 }
562
563 /**
564 * g_output_stream_vprintf:
565 * @stream: a #GOutputStream.
566 * @bytes_written: (out) (optional): location to store the number of bytes that was
567 * written to the stream
568 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
569 * @error: location to store the error occurring, or %NULL to ignore
570 * @format: the format string. See the printf() documentation
571 * @args: the parameters to insert into the format string
572 *
573 * This is a utility function around g_output_stream_write_all(). It
574 * uses g_strdup_vprintf() to turn @format and @args into a string that
575 * is then written to @stream.
576 *
577 * See the documentation of g_output_stream_write_all() about the
578 * behavior of the actual write operation.
579 *
580 * Note that partial writes cannot be properly checked with this
581 * function due to the variable length of the written string, if you
582 * need precise control over partial write failures, you need to
583 * create you own printf()-like wrapper around g_output_stream_write()
584 * or g_output_stream_write_all().
585 *
586 * Since: 2.40
587 *
588 * Returns: %TRUE on success, %FALSE if there was an error
589 **/
590 gboolean
g_output_stream_vprintf(GOutputStream * stream,gsize * bytes_written,GCancellable * cancellable,GError ** error,const gchar * format,va_list args)591 g_output_stream_vprintf (GOutputStream *stream,
592 gsize *bytes_written,
593 GCancellable *cancellable,
594 GError **error,
595 const gchar *format,
596 va_list args)
597 {
598 gchar *text;
599 gboolean success;
600
601 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
602 g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
603 g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
604 g_return_val_if_fail (format != NULL, FALSE);
605
606 text = g_strdup_vprintf (format, args);
607 success = g_output_stream_write_all (stream,
608 text, strlen (text),
609 bytes_written, cancellable, error);
610 g_free (text);
611
612 return success;
613 }
614
615 /**
616 * g_output_stream_write_bytes:
617 * @stream: a #GOutputStream.
618 * @bytes: the #GBytes to write
619 * @cancellable: (nullable): optional cancellable object
620 * @error: location to store the error occurring, or %NULL to ignore
621 *
622 * A wrapper function for g_output_stream_write() which takes a
623 * #GBytes as input. This can be more convenient for use by language
624 * bindings or in other cases where the refcounted nature of #GBytes
625 * is helpful over a bare pointer interface.
626 *
627 * However, note that this function may still perform partial writes,
628 * just like g_output_stream_write(). If that occurs, to continue
629 * writing, you will need to create a new #GBytes containing just the
630 * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
631 * #GBytes instance multiple times potentially can result in duplicated
632 * data in the output stream.
633 *
634 * Returns: Number of bytes written, or -1 on error
635 **/
636 gssize
g_output_stream_write_bytes(GOutputStream * stream,GBytes * bytes,GCancellable * cancellable,GError ** error)637 g_output_stream_write_bytes (GOutputStream *stream,
638 GBytes *bytes,
639 GCancellable *cancellable,
640 GError **error)
641 {
642 gsize size;
643 gconstpointer data;
644
645 data = g_bytes_get_data (bytes, &size);
646
647 return g_output_stream_write (stream,
648 data, size,
649 cancellable,
650 error);
651 }
652
653 /**
654 * g_output_stream_flush:
655 * @stream: a #GOutputStream.
656 * @cancellable: (nullable): optional cancellable object
657 * @error: location to store the error occurring, or %NULL to ignore
658 *
659 * Forces a write of all user-space buffered data for the given
660 * @stream. Will block during the operation. Closing the stream will
661 * implicitly cause a flush.
662 *
663 * This function is optional for inherited classes.
664 *
665 * If @cancellable is not %NULL, then the operation can be cancelled by
666 * triggering the cancellable object from another thread. If the operation
667 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
668 *
669 * Returns: %TRUE on success, %FALSE on error
670 **/
671 gboolean
g_output_stream_flush(GOutputStream * stream,GCancellable * cancellable,GError ** error)672 g_output_stream_flush (GOutputStream *stream,
673 GCancellable *cancellable,
674 GError **error)
675 {
676 GOutputStreamClass *class;
677 gboolean res;
678
679 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
680
681 if (!g_output_stream_set_pending (stream, error))
682 return FALSE;
683
684 class = G_OUTPUT_STREAM_GET_CLASS (stream);
685
686 res = TRUE;
687 if (class->flush)
688 {
689 if (cancellable)
690 g_cancellable_push_current (cancellable);
691
692 res = class->flush (stream, cancellable, error);
693
694 if (cancellable)
695 g_cancellable_pop_current (cancellable);
696 }
697
698 g_output_stream_clear_pending (stream);
699
700 return res;
701 }
702
703 /**
704 * g_output_stream_splice:
705 * @stream: a #GOutputStream.
706 * @source: a #GInputStream.
707 * @flags: a set of #GOutputStreamSpliceFlags.
708 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
709 * @error: a #GError location to store the error occurring, or %NULL to
710 * ignore.
711 *
712 * Splices an input stream into an output stream.
713 *
714 * Returns: a #gssize containing the size of the data spliced, or
715 * -1 if an error occurred. Note that if the number of bytes
716 * spliced is greater than %G_MAXSSIZE, then that will be
717 * returned, and there is no way to determine the actual number
718 * of bytes spliced.
719 **/
720 gssize
g_output_stream_splice(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,GCancellable * cancellable,GError ** error)721 g_output_stream_splice (GOutputStream *stream,
722 GInputStream *source,
723 GOutputStreamSpliceFlags flags,
724 GCancellable *cancellable,
725 GError **error)
726 {
727 GOutputStreamClass *class;
728 gssize bytes_copied;
729
730 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
731 g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
732
733 if (g_input_stream_is_closed (source))
734 {
735 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
736 _("Source stream is already closed"));
737 return -1;
738 }
739
740 if (!g_output_stream_set_pending (stream, error))
741 return -1;
742
743 class = G_OUTPUT_STREAM_GET_CLASS (stream);
744
745 if (cancellable)
746 g_cancellable_push_current (cancellable);
747
748 bytes_copied = class->splice (stream, source, flags, cancellable, error);
749
750 if (cancellable)
751 g_cancellable_pop_current (cancellable);
752
753 g_output_stream_clear_pending (stream);
754
755 return bytes_copied;
756 }
757
758 static gssize
g_output_stream_real_splice(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,GCancellable * cancellable,GError ** error)759 g_output_stream_real_splice (GOutputStream *stream,
760 GInputStream *source,
761 GOutputStreamSpliceFlags flags,
762 GCancellable *cancellable,
763 GError **error)
764 {
765 GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
766 gssize n_read, n_written;
767 gsize bytes_copied;
768 char buffer[8192], *p;
769 gboolean res;
770
771 bytes_copied = 0;
772 if (class->write_fn == NULL)
773 {
774 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
775 _("Output stream doesn’t implement write"));
776 res = FALSE;
777 goto notsupported;
778 }
779
780 res = TRUE;
781 do
782 {
783 n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
784 if (n_read == -1)
785 {
786 res = FALSE;
787 break;
788 }
789
790 if (n_read == 0)
791 break;
792
793 p = buffer;
794 while (n_read > 0)
795 {
796 n_written = class->write_fn (stream, p, n_read, cancellable, error);
797 if (n_written == -1)
798 {
799 res = FALSE;
800 break;
801 }
802
803 p += n_written;
804 n_read -= n_written;
805 bytes_copied += n_written;
806 }
807
808 if (bytes_copied > G_MAXSSIZE)
809 bytes_copied = G_MAXSSIZE;
810 }
811 while (res);
812
813 notsupported:
814 if (!res)
815 error = NULL; /* Ignore further errors */
816
817 if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
818 {
819 /* Don't care about errors in source here */
820 g_input_stream_close (source, cancellable, NULL);
821 }
822
823 if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
824 {
825 /* But write errors on close are bad! */
826 if (!g_output_stream_internal_close (stream, cancellable, error))
827 res = FALSE;
828 }
829
830 if (res)
831 return bytes_copied;
832
833 return -1;
834 }
835
836 /* Must always be called inside
837 * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
838 static gboolean
g_output_stream_internal_close(GOutputStream * stream,GCancellable * cancellable,GError ** error)839 g_output_stream_internal_close (GOutputStream *stream,
840 GCancellable *cancellable,
841 GError **error)
842 {
843 GOutputStreamClass *class;
844 gboolean res;
845
846 if (stream->priv->closed)
847 return TRUE;
848
849 class = G_OUTPUT_STREAM_GET_CLASS (stream);
850
851 stream->priv->closing = TRUE;
852
853 if (cancellable)
854 g_cancellable_push_current (cancellable);
855
856 if (class->flush)
857 res = class->flush (stream, cancellable, error);
858 else
859 res = TRUE;
860
861 if (!res)
862 {
863 /* flushing caused the error that we want to return,
864 * but we still want to close the underlying stream if possible
865 */
866 if (class->close_fn)
867 class->close_fn (stream, cancellable, NULL);
868 }
869 else
870 {
871 res = TRUE;
872 if (class->close_fn)
873 res = class->close_fn (stream, cancellable, error);
874 }
875
876 if (cancellable)
877 g_cancellable_pop_current (cancellable);
878
879 stream->priv->closing = FALSE;
880 stream->priv->closed = TRUE;
881
882 return res;
883 }
884
885 /**
886 * g_output_stream_close:
887 * @stream: A #GOutputStream.
888 * @cancellable: (nullable): optional cancellable object
889 * @error: location to store the error occurring, or %NULL to ignore
890 *
891 * Closes the stream, releasing resources related to it.
892 *
893 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
894 * Closing a stream multiple times will not return an error.
895 *
896 * Closing a stream will automatically flush any outstanding buffers in the
897 * stream.
898 *
899 * Streams will be automatically closed when the last reference
900 * is dropped, but you might want to call this function to make sure
901 * resources are released as early as possible.
902 *
903 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
904 * open after the stream is closed. See the documentation for the individual
905 * stream for details.
906 *
907 * On failure the first error that happened will be reported, but the close
908 * operation will finish as much as possible. A stream that failed to
909 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
910 * is important to check and report the error to the user, otherwise
911 * there might be a loss of data as all data might not be written.
912 *
913 * If @cancellable is not %NULL, then the operation can be cancelled by
914 * triggering the cancellable object from another thread. If the operation
915 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
916 * Cancelling a close will still leave the stream closed, but there some streams
917 * can use a faster close that doesn't block to e.g. check errors. On
918 * cancellation (as with any error) there is no guarantee that all written
919 * data will reach the target.
920 *
921 * Returns: %TRUE on success, %FALSE on failure
922 **/
923 gboolean
g_output_stream_close(GOutputStream * stream,GCancellable * cancellable,GError ** error)924 g_output_stream_close (GOutputStream *stream,
925 GCancellable *cancellable,
926 GError **error)
927 {
928 gboolean res;
929
930 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
931
932 if (stream->priv->closed)
933 return TRUE;
934
935 if (!g_output_stream_set_pending (stream, error))
936 return FALSE;
937
938 res = g_output_stream_internal_close (stream, cancellable, error);
939
940 g_output_stream_clear_pending (stream);
941
942 return res;
943 }
944
945 static void
async_ready_write_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)946 async_ready_write_callback_wrapper (GObject *source_object,
947 GAsyncResult *res,
948 gpointer user_data)
949 {
950 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
951 GOutputStreamClass *class;
952 GTask *task = user_data;
953 gssize nwrote;
954 GError *error = NULL;
955
956 g_output_stream_clear_pending (stream);
957
958 if (g_async_result_legacy_propagate_error (res, &error))
959 nwrote = -1;
960 else
961 {
962 class = G_OUTPUT_STREAM_GET_CLASS (stream);
963 nwrote = class->write_finish (stream, res, &error);
964 }
965
966 if (nwrote >= 0)
967 g_task_return_int (task, nwrote);
968 else
969 g_task_return_error (task, error);
970 g_object_unref (task);
971 }
972
973 /**
974 * g_output_stream_write_async:
975 * @stream: A #GOutputStream.
976 * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
977 * @count: the number of bytes to write
978 * @io_priority: the io priority of the request.
979 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
980 * @callback: (scope async): callback to call when the request is satisfied
981 * @user_data: (closure): the data to pass to callback function
982 *
983 * Request an asynchronous write of @count bytes from @buffer into
984 * the stream. When the operation is finished @callback will be called.
985 * You can then call g_output_stream_write_finish() to get the result of the
986 * operation.
987 *
988 * During an async request no other sync and async calls are allowed,
989 * and will result in %G_IO_ERROR_PENDING errors.
990 *
991 * A value of @count larger than %G_MAXSSIZE will cause a
992 * %G_IO_ERROR_INVALID_ARGUMENT error.
993 *
994 * On success, the number of bytes written will be passed to the
995 * @callback. It is not an error if this is not the same as the
996 * requested size, as it can happen e.g. on a partial I/O error,
997 * but generally we try to write as many bytes as requested.
998 *
999 * You are guaranteed that this method will never fail with
1000 * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
1001 * method will just wait until this changes.
1002 *
1003 * Any outstanding I/O request with higher priority (lower numerical
1004 * value) will be executed before an outstanding request with lower
1005 * priority. Default priority is %G_PRIORITY_DEFAULT.
1006 *
1007 * The asynchronous methods have a default fallback that uses threads
1008 * to implement asynchronicity, so they are optional for inheriting
1009 * classes. However, if you override one you must override all.
1010 *
1011 * For the synchronous, blocking version of this function, see
1012 * g_output_stream_write().
1013 *
1014 * Note that no copy of @buffer will be made, so it must stay valid
1015 * until @callback is called. See g_output_stream_write_bytes_async()
1016 * for a #GBytes version that will automatically hold a reference to
1017 * the contents (without copying) for the duration of the call.
1018 */
1019 void
g_output_stream_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1020 g_output_stream_write_async (GOutputStream *stream,
1021 const void *buffer,
1022 gsize count,
1023 int io_priority,
1024 GCancellable *cancellable,
1025 GAsyncReadyCallback callback,
1026 gpointer user_data)
1027 {
1028 GOutputStreamClass *class;
1029 GError *error = NULL;
1030 GTask *task;
1031
1032 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1033 g_return_if_fail (buffer != NULL);
1034
1035 task = g_task_new (stream, cancellable, callback, user_data);
1036 g_task_set_source_tag (task, g_output_stream_write_async);
1037 g_task_set_priority (task, io_priority);
1038
1039 if (count == 0)
1040 {
1041 g_task_return_int (task, 0);
1042 g_object_unref (task);
1043 return;
1044 }
1045
1046 if (((gssize) count) < 0)
1047 {
1048 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1049 _("Too large count value passed to %s"),
1050 G_STRFUNC);
1051 g_object_unref (task);
1052 return;
1053 }
1054
1055 if (!g_output_stream_set_pending (stream, &error))
1056 {
1057 g_task_return_error (task, error);
1058 g_object_unref (task);
1059 return;
1060 }
1061
1062 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1063
1064 class->write_async (stream, buffer, count, io_priority, cancellable,
1065 async_ready_write_callback_wrapper, task);
1066 }
1067
1068 /**
1069 * g_output_stream_write_finish:
1070 * @stream: a #GOutputStream.
1071 * @result: a #GAsyncResult.
1072 * @error: a #GError location to store the error occurring, or %NULL to
1073 * ignore.
1074 *
1075 * Finishes a stream write operation.
1076 *
1077 * Returns: a #gssize containing the number of bytes written to the stream.
1078 **/
1079 gssize
g_output_stream_write_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1080 g_output_stream_write_finish (GOutputStream *stream,
1081 GAsyncResult *result,
1082 GError **error)
1083 {
1084 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1085 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1086 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
1087
1088 /* @result is always the GTask created by g_output_stream_write_async();
1089 * we called class->write_finish() from async_ready_write_callback_wrapper.
1090 */
1091 return g_task_propagate_int (G_TASK (result), error);
1092 }
1093
1094 typedef struct
1095 {
1096 const guint8 *buffer;
1097 gsize to_write;
1098 gsize bytes_written;
1099 } AsyncWriteAll;
1100
1101 static void
free_async_write_all(gpointer data)1102 free_async_write_all (gpointer data)
1103 {
1104 g_slice_free (AsyncWriteAll, data);
1105 }
1106
1107 static void
write_all_callback(GObject * stream,GAsyncResult * result,gpointer user_data)1108 write_all_callback (GObject *stream,
1109 GAsyncResult *result,
1110 gpointer user_data)
1111 {
1112 GTask *task = user_data;
1113 AsyncWriteAll *data = g_task_get_task_data (task);
1114
1115 if (result)
1116 {
1117 GError *error = NULL;
1118 gssize nwritten;
1119
1120 nwritten = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), result, &error);
1121
1122 if (nwritten == -1)
1123 {
1124 g_task_return_error (task, error);
1125 g_object_unref (task);
1126 return;
1127 }
1128
1129 g_assert_cmpint (nwritten, <=, data->to_write);
1130 g_warn_if_fail (nwritten > 0);
1131
1132 data->to_write -= nwritten;
1133 data->bytes_written += nwritten;
1134 }
1135
1136 if (data->to_write == 0)
1137 {
1138 g_task_return_boolean (task, TRUE);
1139 g_object_unref (task);
1140 }
1141 else
1142 g_output_stream_write_async (G_OUTPUT_STREAM (stream),
1143 data->buffer + data->bytes_written,
1144 data->to_write,
1145 g_task_get_priority (task),
1146 g_task_get_cancellable (task),
1147 write_all_callback, task);
1148 }
1149
1150 static void
write_all_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1151 write_all_async_thread (GTask *task,
1152 gpointer source_object,
1153 gpointer task_data,
1154 GCancellable *cancellable)
1155 {
1156 GOutputStream *stream = source_object;
1157 AsyncWriteAll *data = task_data;
1158 GError *error = NULL;
1159
1160 if (g_output_stream_write_all (stream, data->buffer, data->to_write, &data->bytes_written,
1161 g_task_get_cancellable (task), &error))
1162 g_task_return_boolean (task, TRUE);
1163 else
1164 g_task_return_error (task, error);
1165 }
1166
1167 /**
1168 * g_output_stream_write_all_async:
1169 * @stream: A #GOutputStream
1170 * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write
1171 * @count: the number of bytes to write
1172 * @io_priority: the io priority of the request
1173 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
1174 * @callback: (scope async): callback to call when the request is satisfied
1175 * @user_data: (closure): the data to pass to callback function
1176 *
1177 * Request an asynchronous write of @count bytes from @buffer into
1178 * the stream. When the operation is finished @callback will be called.
1179 * You can then call g_output_stream_write_all_finish() to get the result of the
1180 * operation.
1181 *
1182 * This is the asynchronous version of g_output_stream_write_all().
1183 *
1184 * Call g_output_stream_write_all_finish() to collect the result.
1185 *
1186 * Any outstanding I/O request with higher priority (lower numerical
1187 * value) will be executed before an outstanding request with lower
1188 * priority. Default priority is %G_PRIORITY_DEFAULT.
1189 *
1190 * Note that no copy of @buffer will be made, so it must stay valid
1191 * until @callback is called.
1192 *
1193 * Since: 2.44
1194 */
1195 void
g_output_stream_write_all_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1196 g_output_stream_write_all_async (GOutputStream *stream,
1197 const void *buffer,
1198 gsize count,
1199 int io_priority,
1200 GCancellable *cancellable,
1201 GAsyncReadyCallback callback,
1202 gpointer user_data)
1203 {
1204 AsyncWriteAll *data;
1205 GTask *task;
1206
1207 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1208 g_return_if_fail (buffer != NULL || count == 0);
1209
1210 task = g_task_new (stream, cancellable, callback, user_data);
1211 data = g_slice_new0 (AsyncWriteAll);
1212 data->buffer = buffer;
1213 data->to_write = count;
1214
1215 g_task_set_source_tag (task, g_output_stream_write_all_async);
1216 g_task_set_task_data (task, data, free_async_write_all);
1217 g_task_set_priority (task, io_priority);
1218
1219 /* If async writes are going to be handled via the threadpool anyway
1220 * then we may as well do it with a single dispatch instead of
1221 * bouncing in and out.
1222 */
1223 if (g_output_stream_async_write_is_via_threads (stream))
1224 {
1225 g_task_run_in_thread (task, write_all_async_thread);
1226 g_object_unref (task);
1227 }
1228 else
1229 write_all_callback (G_OBJECT (stream), NULL, task);
1230 }
1231
1232 /**
1233 * g_output_stream_write_all_finish:
1234 * @stream: a #GOutputStream
1235 * @result: a #GAsyncResult
1236 * @bytes_written: (out) (optional): location to store the number of bytes that was written to the stream
1237 * @error: a #GError location to store the error occurring, or %NULL to ignore.
1238 *
1239 * Finishes an asynchronous stream write operation started with
1240 * g_output_stream_write_all_async().
1241 *
1242 * As a special exception to the normal conventions for functions that
1243 * use #GError, if this function returns %FALSE (and sets @error) then
1244 * @bytes_written will be set to the number of bytes that were
1245 * successfully written before the error was encountered. This
1246 * functionality is only available from C. If you need it from another
1247 * language then you must write your own loop around
1248 * g_output_stream_write_async().
1249 *
1250 * Returns: %TRUE on success, %FALSE if there was an error
1251 *
1252 * Since: 2.44
1253 **/
1254 gboolean
g_output_stream_write_all_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)1255 g_output_stream_write_all_finish (GOutputStream *stream,
1256 GAsyncResult *result,
1257 gsize *bytes_written,
1258 GError **error)
1259 {
1260 GTask *task;
1261
1262 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1263 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1264
1265 task = G_TASK (result);
1266
1267 if (bytes_written)
1268 {
1269 AsyncWriteAll *data = (AsyncWriteAll *)g_task_get_task_data (task);
1270
1271 *bytes_written = data->bytes_written;
1272 }
1273
1274 return g_task_propagate_boolean (task, error);
1275 }
1276
1277 /**
1278 * g_output_stream_writev_async:
1279 * @stream: A #GOutputStream.
1280 * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
1281 * @n_vectors: the number of vectors to write
1282 * @io_priority: the I/O priority of the request.
1283 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1284 * @callback: (scope async): callback to call when the request is satisfied
1285 * @user_data: (closure): the data to pass to callback function
1286 *
1287 * Request an asynchronous write of the bytes contained in @n_vectors @vectors into
1288 * the stream. When the operation is finished @callback will be called.
1289 * You can then call g_output_stream_writev_finish() to get the result of the
1290 * operation.
1291 *
1292 * During an async request no other sync and async calls are allowed,
1293 * and will result in %G_IO_ERROR_PENDING errors.
1294 *
1295 * On success, the number of bytes written will be passed to the
1296 * @callback. It is not an error if this is not the same as the
1297 * requested size, as it can happen e.g. on a partial I/O error,
1298 * but generally we try to write as many bytes as requested.
1299 *
1300 * You are guaranteed that this method will never fail with
1301 * %G_IO_ERROR_WOULD_BLOCK — if @stream can't accept more data, the
1302 * method will just wait until this changes.
1303 *
1304 * Any outstanding I/O request with higher priority (lower numerical
1305 * value) will be executed before an outstanding request with lower
1306 * priority. Default priority is %G_PRIORITY_DEFAULT.
1307 *
1308 * The asynchronous methods have a default fallback that uses threads
1309 * to implement asynchronicity, so they are optional for inheriting
1310 * classes. However, if you override one you must override all.
1311 *
1312 * For the synchronous, blocking version of this function, see
1313 * g_output_stream_writev().
1314 *
1315 * Note that no copy of @vectors will be made, so it must stay valid
1316 * until @callback is called.
1317 *
1318 * Since: 2.60
1319 */
1320 void
g_output_stream_writev_async(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1321 g_output_stream_writev_async (GOutputStream *stream,
1322 const GOutputVector *vectors,
1323 gsize n_vectors,
1324 int io_priority,
1325 GCancellable *cancellable,
1326 GAsyncReadyCallback callback,
1327 gpointer user_data)
1328 {
1329 GOutputStreamClass *class;
1330
1331 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1332 g_return_if_fail (vectors != NULL || n_vectors == 0);
1333 g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
1334
1335 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1336 g_return_if_fail (class->writev_async != NULL);
1337
1338 class->writev_async (stream, vectors, n_vectors, io_priority, cancellable,
1339 callback, user_data);
1340 }
1341
1342 /**
1343 * g_output_stream_writev_finish:
1344 * @stream: a #GOutputStream.
1345 * @result: a #GAsyncResult.
1346 * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
1347 * @error: a #GError location to store the error occurring, or %NULL to
1348 * ignore.
1349 *
1350 * Finishes a stream writev operation.
1351 *
1352 * Returns: %TRUE on success, %FALSE if there was an error
1353 *
1354 * Since: 2.60
1355 */
1356 gboolean
g_output_stream_writev_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)1357 g_output_stream_writev_finish (GOutputStream *stream,
1358 GAsyncResult *result,
1359 gsize *bytes_written,
1360 GError **error)
1361 {
1362 GOutputStreamClass *class;
1363 gboolean res;
1364 gsize _bytes_written = 0;
1365
1366 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1367 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1368 g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
1369
1370 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1371 g_return_val_if_fail (class->writev_finish != NULL, FALSE);
1372
1373 res = class->writev_finish (stream, result, &_bytes_written, error);
1374
1375 g_warn_if_fail (res || _bytes_written == 0);
1376 g_warn_if_fail (res || (error == NULL || *error != NULL));
1377
1378 if (bytes_written)
1379 *bytes_written = _bytes_written;
1380
1381 return res;
1382 }
1383
1384 typedef struct
1385 {
1386 GOutputVector *vectors;
1387 gsize n_vectors; /* (unowned) */
1388 gsize bytes_written;
1389 } AsyncWritevAll;
1390
1391 static void
free_async_writev_all(gpointer data)1392 free_async_writev_all (gpointer data)
1393 {
1394 g_slice_free (AsyncWritevAll, data);
1395 }
1396
1397 static void
writev_all_callback(GObject * stream,GAsyncResult * result,gpointer user_data)1398 writev_all_callback (GObject *stream,
1399 GAsyncResult *result,
1400 gpointer user_data)
1401 {
1402 GTask *task = user_data;
1403 AsyncWritevAll *data = g_task_get_task_data (task);
1404 gint priority = g_task_get_priority (task);
1405 GCancellable *cancellable = g_task_get_cancellable (task);
1406
1407 if (result)
1408 {
1409 GError *error = NULL;
1410 gboolean res;
1411 gsize n_written = 0;
1412
1413 res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error);
1414
1415 if (!res)
1416 {
1417 g_task_return_error (task, g_steal_pointer (&error));
1418 g_object_unref (task);
1419 return;
1420 }
1421
1422 g_warn_if_fail (n_written > 0);
1423 data->bytes_written += n_written;
1424
1425 /* skip vectors that have been written in full */
1426 while (data->n_vectors > 0 && n_written >= data->vectors[0].size)
1427 {
1428 n_written -= data->vectors[0].size;
1429 ++data->vectors;
1430 --data->n_vectors;
1431 }
1432 /* skip partially written vector data */
1433 if (n_written > 0 && data->n_vectors > 0)
1434 {
1435 data->vectors[0].size -= n_written;
1436 data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written;
1437 }
1438 }
1439
1440 if (data->n_vectors == 0)
1441 {
1442 g_task_return_boolean (task, TRUE);
1443 g_object_unref (task);
1444 }
1445 else
1446 g_output_stream_writev_async (G_OUTPUT_STREAM (stream),
1447 data->vectors,
1448 data->n_vectors,
1449 priority,
1450 cancellable,
1451 writev_all_callback, g_steal_pointer (&task));
1452 }
1453
1454 static void
writev_all_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1455 writev_all_async_thread (GTask *task,
1456 gpointer source_object,
1457 gpointer task_data,
1458 GCancellable *cancellable)
1459 {
1460 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1461 AsyncWritevAll *data = task_data;
1462 GError *error = NULL;
1463
1464 if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written,
1465 g_task_get_cancellable (task), &error))
1466 g_task_return_boolean (task, TRUE);
1467 else
1468 g_task_return_error (task, g_steal_pointer (&error));
1469 }
1470
1471 /**
1472 * g_output_stream_writev_all_async:
1473 * @stream: A #GOutputStream
1474 * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
1475 * @n_vectors: the number of vectors to write
1476 * @io_priority: the I/O priority of the request
1477 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
1478 * @callback: (scope async): callback to call when the request is satisfied
1479 * @user_data: (closure): the data to pass to callback function
1480 *
1481 * Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
1482 * the stream. When the operation is finished @callback will be called.
1483 * You can then call g_output_stream_writev_all_finish() to get the result of the
1484 * operation.
1485 *
1486 * This is the asynchronous version of g_output_stream_writev_all().
1487 *
1488 * Call g_output_stream_writev_all_finish() to collect the result.
1489 *
1490 * Any outstanding I/O request with higher priority (lower numerical
1491 * value) will be executed before an outstanding request with lower
1492 * priority. Default priority is %G_PRIORITY_DEFAULT.
1493 *
1494 * Note that no copy of @vectors will be made, so it must stay valid
1495 * until @callback is called. The content of the individual elements
1496 * of @vectors might be changed by this function.
1497 *
1498 * Since: 2.60
1499 */
1500 void
g_output_stream_writev_all_async(GOutputStream * stream,GOutputVector * vectors,gsize n_vectors,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1501 g_output_stream_writev_all_async (GOutputStream *stream,
1502 GOutputVector *vectors,
1503 gsize n_vectors,
1504 int io_priority,
1505 GCancellable *cancellable,
1506 GAsyncReadyCallback callback,
1507 gpointer user_data)
1508 {
1509 AsyncWritevAll *data;
1510 GTask *task;
1511 gsize i, to_be_written = 0;
1512
1513 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1514 g_return_if_fail (vectors != NULL || n_vectors == 0);
1515 g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
1516
1517 task = g_task_new (stream, cancellable, callback, user_data);
1518 data = g_slice_new0 (AsyncWritevAll);
1519 data->vectors = vectors;
1520 data->n_vectors = n_vectors;
1521
1522 g_task_set_source_tag (task, g_output_stream_writev_all_async);
1523 g_task_set_task_data (task, data, free_async_writev_all);
1524 g_task_set_priority (task, io_priority);
1525
1526 /* We can't write more than G_MAXSIZE bytes overall, otherwise we
1527 * would overflow the bytes_written counter */
1528 for (i = 0; i < n_vectors; i++)
1529 {
1530 if (to_be_written > G_MAXSIZE - vectors[i].size)
1531 {
1532 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1533 _("Sum of vectors passed to %s too large"),
1534 G_STRFUNC);
1535 g_object_unref (task);
1536 return;
1537 }
1538 to_be_written += vectors[i].size;
1539 }
1540
1541 /* If async writes are going to be handled via the threadpool anyway
1542 * then we may as well do it with a single dispatch instead of
1543 * bouncing in and out.
1544 */
1545 if (g_output_stream_async_writev_is_via_threads (stream))
1546 {
1547 g_task_run_in_thread (task, writev_all_async_thread);
1548 g_object_unref (task);
1549 }
1550 else
1551 writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task));
1552 }
1553
1554 /**
1555 * g_output_stream_writev_all_finish:
1556 * @stream: a #GOutputStream
1557 * @result: a #GAsyncResult
1558 * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
1559 * @error: a #GError location to store the error occurring, or %NULL to ignore.
1560 *
1561 * Finishes an asynchronous stream write operation started with
1562 * g_output_stream_writev_all_async().
1563 *
1564 * As a special exception to the normal conventions for functions that
1565 * use #GError, if this function returns %FALSE (and sets @error) then
1566 * @bytes_written will be set to the number of bytes that were
1567 * successfully written before the error was encountered. This
1568 * functionality is only available from C. If you need it from another
1569 * language then you must write your own loop around
1570 * g_output_stream_writev_async().
1571 *
1572 * Returns: %TRUE on success, %FALSE if there was an error
1573 *
1574 * Since: 2.60
1575 */
1576 gboolean
g_output_stream_writev_all_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)1577 g_output_stream_writev_all_finish (GOutputStream *stream,
1578 GAsyncResult *result,
1579 gsize *bytes_written,
1580 GError **error)
1581 {
1582 GTask *task;
1583
1584 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1585 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1586 g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
1587
1588 task = G_TASK (result);
1589
1590 if (bytes_written)
1591 {
1592 AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task);
1593
1594 *bytes_written = data->bytes_written;
1595 }
1596
1597 return g_task_propagate_boolean (task, error);
1598 }
1599
1600 static void
write_bytes_callback(GObject * stream,GAsyncResult * result,gpointer user_data)1601 write_bytes_callback (GObject *stream,
1602 GAsyncResult *result,
1603 gpointer user_data)
1604 {
1605 GTask *task = user_data;
1606 GError *error = NULL;
1607 gssize nwrote;
1608
1609 nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
1610 result, &error);
1611 if (nwrote == -1)
1612 g_task_return_error (task, error);
1613 else
1614 g_task_return_int (task, nwrote);
1615 g_object_unref (task);
1616 }
1617
1618 /**
1619 * g_output_stream_write_bytes_async:
1620 * @stream: A #GOutputStream.
1621 * @bytes: The bytes to write
1622 * @io_priority: the io priority of the request.
1623 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1624 * @callback: (scope async): callback to call when the request is satisfied
1625 * @user_data: (closure): the data to pass to callback function
1626 *
1627 * This function is similar to g_output_stream_write_async(), but
1628 * takes a #GBytes as input. Due to the refcounted nature of #GBytes,
1629 * this allows the stream to avoid taking a copy of the data.
1630 *
1631 * However, note that this function may still perform partial writes,
1632 * just like g_output_stream_write_async(). If that occurs, to continue
1633 * writing, you will need to create a new #GBytes containing just the
1634 * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
1635 * #GBytes instance multiple times potentially can result in duplicated
1636 * data in the output stream.
1637 *
1638 * For the synchronous, blocking version of this function, see
1639 * g_output_stream_write_bytes().
1640 **/
1641 void
g_output_stream_write_bytes_async(GOutputStream * stream,GBytes * bytes,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1642 g_output_stream_write_bytes_async (GOutputStream *stream,
1643 GBytes *bytes,
1644 int io_priority,
1645 GCancellable *cancellable,
1646 GAsyncReadyCallback callback,
1647 gpointer user_data)
1648 {
1649 GTask *task;
1650 gsize size;
1651 gconstpointer data;
1652
1653 data = g_bytes_get_data (bytes, &size);
1654
1655 task = g_task_new (stream, cancellable, callback, user_data);
1656 g_task_set_source_tag (task, g_output_stream_write_bytes_async);
1657 g_task_set_task_data (task, g_bytes_ref (bytes),
1658 (GDestroyNotify) g_bytes_unref);
1659
1660 g_output_stream_write_async (stream,
1661 data, size,
1662 io_priority,
1663 cancellable,
1664 write_bytes_callback,
1665 task);
1666 }
1667
1668 /**
1669 * g_output_stream_write_bytes_finish:
1670 * @stream: a #GOutputStream.
1671 * @result: a #GAsyncResult.
1672 * @error: a #GError location to store the error occurring, or %NULL to
1673 * ignore.
1674 *
1675 * Finishes a stream write-from-#GBytes operation.
1676 *
1677 * Returns: a #gssize containing the number of bytes written to the stream.
1678 **/
1679 gssize
g_output_stream_write_bytes_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1680 g_output_stream_write_bytes_finish (GOutputStream *stream,
1681 GAsyncResult *result,
1682 GError **error)
1683 {
1684 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
1685 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1686
1687 return g_task_propagate_int (G_TASK (result), error);
1688 }
1689
1690 static void
async_ready_splice_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer _data)1691 async_ready_splice_callback_wrapper (GObject *source_object,
1692 GAsyncResult *res,
1693 gpointer _data)
1694 {
1695 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1696 GOutputStreamClass *class;
1697 GTask *task = _data;
1698 gssize nspliced;
1699 GError *error = NULL;
1700
1701 g_output_stream_clear_pending (stream);
1702
1703 if (g_async_result_legacy_propagate_error (res, &error))
1704 nspliced = -1;
1705 else
1706 {
1707 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1708 nspliced = class->splice_finish (stream, res, &error);
1709 }
1710
1711 if (nspliced >= 0)
1712 g_task_return_int (task, nspliced);
1713 else
1714 g_task_return_error (task, error);
1715 g_object_unref (task);
1716 }
1717
1718 /**
1719 * g_output_stream_splice_async:
1720 * @stream: a #GOutputStream.
1721 * @source: a #GInputStream.
1722 * @flags: a set of #GOutputStreamSpliceFlags.
1723 * @io_priority: the io priority of the request.
1724 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1725 * @callback: (scope async): a #GAsyncReadyCallback.
1726 * @user_data: (closure): user data passed to @callback.
1727 *
1728 * Splices a stream asynchronously.
1729 * When the operation is finished @callback will be called.
1730 * You can then call g_output_stream_splice_finish() to get the
1731 * result of the operation.
1732 *
1733 * For the synchronous, blocking version of this function, see
1734 * g_output_stream_splice().
1735 **/
1736 void
g_output_stream_splice_async(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1737 g_output_stream_splice_async (GOutputStream *stream,
1738 GInputStream *source,
1739 GOutputStreamSpliceFlags flags,
1740 int io_priority,
1741 GCancellable *cancellable,
1742 GAsyncReadyCallback callback,
1743 gpointer user_data)
1744 {
1745 GOutputStreamClass *class;
1746 GTask *task;
1747 GError *error = NULL;
1748
1749 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1750 g_return_if_fail (G_IS_INPUT_STREAM (source));
1751
1752 task = g_task_new (stream, cancellable, callback, user_data);
1753 g_task_set_source_tag (task, g_output_stream_splice_async);
1754 g_task_set_priority (task, io_priority);
1755 g_task_set_task_data (task, g_object_ref (source), g_object_unref);
1756
1757 if (g_input_stream_is_closed (source))
1758 {
1759 g_task_return_new_error (task,
1760 G_IO_ERROR, G_IO_ERROR_CLOSED,
1761 _("Source stream is already closed"));
1762 g_object_unref (task);
1763 return;
1764 }
1765
1766 if (!g_output_stream_set_pending (stream, &error))
1767 {
1768 g_task_return_error (task, error);
1769 g_object_unref (task);
1770 return;
1771 }
1772
1773 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1774
1775 class->splice_async (stream, source, flags, io_priority, cancellable,
1776 async_ready_splice_callback_wrapper, task);
1777 }
1778
1779 /**
1780 * g_output_stream_splice_finish:
1781 * @stream: a #GOutputStream.
1782 * @result: a #GAsyncResult.
1783 * @error: a #GError location to store the error occurring, or %NULL to
1784 * ignore.
1785 *
1786 * Finishes an asynchronous stream splice operation.
1787 *
1788 * Returns: a #gssize of the number of bytes spliced. Note that if the
1789 * number of bytes spliced is greater than %G_MAXSSIZE, then that
1790 * will be returned, and there is no way to determine the actual
1791 * number of bytes spliced.
1792 **/
1793 gssize
g_output_stream_splice_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1794 g_output_stream_splice_finish (GOutputStream *stream,
1795 GAsyncResult *result,
1796 GError **error)
1797 {
1798 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1799 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1800 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
1801
1802 /* @result is always the GTask created by g_output_stream_splice_async();
1803 * we called class->splice_finish() from async_ready_splice_callback_wrapper.
1804 */
1805 return g_task_propagate_int (G_TASK (result), error);
1806 }
1807
1808 static void
async_ready_flush_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1809 async_ready_flush_callback_wrapper (GObject *source_object,
1810 GAsyncResult *res,
1811 gpointer user_data)
1812 {
1813 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1814 GOutputStreamClass *class;
1815 GTask *task = user_data;
1816 gboolean flushed;
1817 GError *error = NULL;
1818
1819 g_output_stream_clear_pending (stream);
1820
1821 if (g_async_result_legacy_propagate_error (res, &error))
1822 flushed = FALSE;
1823 else
1824 {
1825 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1826 flushed = class->flush_finish (stream, res, &error);
1827 }
1828
1829 if (flushed)
1830 g_task_return_boolean (task, TRUE);
1831 else
1832 g_task_return_error (task, error);
1833 g_object_unref (task);
1834 }
1835
1836 /**
1837 * g_output_stream_flush_async:
1838 * @stream: a #GOutputStream.
1839 * @io_priority: the io priority of the request.
1840 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1841 * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1842 * @user_data: (closure): the data to pass to callback function
1843 *
1844 * Forces an asynchronous write of all user-space buffered data for
1845 * the given @stream.
1846 * For behaviour details see g_output_stream_flush().
1847 *
1848 * When the operation is finished @callback will be
1849 * called. You can then call g_output_stream_flush_finish() to get the
1850 * result of the operation.
1851 **/
1852 void
g_output_stream_flush_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1853 g_output_stream_flush_async (GOutputStream *stream,
1854 int io_priority,
1855 GCancellable *cancellable,
1856 GAsyncReadyCallback callback,
1857 gpointer user_data)
1858 {
1859 GOutputStreamClass *class;
1860 GTask *task;
1861 GError *error = NULL;
1862
1863 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1864
1865 task = g_task_new (stream, cancellable, callback, user_data);
1866 g_task_set_source_tag (task, g_output_stream_flush_async);
1867 g_task_set_priority (task, io_priority);
1868
1869 if (!g_output_stream_set_pending (stream, &error))
1870 {
1871 g_task_return_error (task, error);
1872 g_object_unref (task);
1873 return;
1874 }
1875
1876 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1877
1878 if (class->flush_async == NULL)
1879 {
1880 g_output_stream_clear_pending (stream);
1881 g_task_return_boolean (task, TRUE);
1882 g_object_unref (task);
1883 return;
1884 }
1885
1886 class->flush_async (stream, io_priority, cancellable,
1887 async_ready_flush_callback_wrapper, task);
1888 }
1889
1890 /**
1891 * g_output_stream_flush_finish:
1892 * @stream: a #GOutputStream.
1893 * @result: a GAsyncResult.
1894 * @error: a #GError location to store the error occurring, or %NULL to
1895 * ignore.
1896 *
1897 * Finishes flushing an output stream.
1898 *
1899 * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1900 **/
1901 gboolean
g_output_stream_flush_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)1902 g_output_stream_flush_finish (GOutputStream *stream,
1903 GAsyncResult *result,
1904 GError **error)
1905 {
1906 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1907 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1908 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1909
1910 /* @result is always the GTask created by g_output_stream_flush_async();
1911 * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1912 */
1913 return g_task_propagate_boolean (G_TASK (result), error);
1914 }
1915
1916
1917 static void
async_ready_close_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1918 async_ready_close_callback_wrapper (GObject *source_object,
1919 GAsyncResult *res,
1920 gpointer user_data)
1921 {
1922 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1923 GOutputStreamClass *class;
1924 GTask *task = user_data;
1925 GError *error = g_task_get_task_data (task);
1926
1927 stream->priv->closing = FALSE;
1928 stream->priv->closed = TRUE;
1929
1930 if (!error && !g_async_result_legacy_propagate_error (res, &error))
1931 {
1932 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1933
1934 class->close_finish (stream, res,
1935 error ? NULL : &error);
1936 }
1937
1938 if (error != NULL)
1939 g_task_return_error (task, error);
1940 else
1941 g_task_return_boolean (task, TRUE);
1942 g_object_unref (task);
1943 }
1944
1945 static void
async_ready_close_flushed_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1946 async_ready_close_flushed_callback_wrapper (GObject *source_object,
1947 GAsyncResult *res,
1948 gpointer user_data)
1949 {
1950 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1951 GOutputStreamClass *class;
1952 GTask *task = user_data;
1953 GError *error = NULL;
1954
1955 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1956
1957 if (!g_async_result_legacy_propagate_error (res, &error))
1958 {
1959 class->flush_finish (stream, res, &error);
1960 }
1961
1962 /* propagate the possible error */
1963 if (error)
1964 g_task_set_task_data (task, error, NULL);
1965
1966 /* we still close, even if there was a flush error */
1967 class->close_async (stream,
1968 g_task_get_priority (task),
1969 g_task_get_cancellable (task),
1970 async_ready_close_callback_wrapper, task);
1971 }
1972
1973 static void
real_close_async_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1974 real_close_async_cb (GObject *source_object,
1975 GAsyncResult *res,
1976 gpointer user_data)
1977 {
1978 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1979 GTask *task = user_data;
1980 GError *error = NULL;
1981 gboolean ret;
1982
1983 g_output_stream_clear_pending (stream);
1984
1985 ret = g_output_stream_internal_close_finish (stream, res, &error);
1986
1987 if (error != NULL)
1988 g_task_return_error (task, error);
1989 else
1990 g_task_return_boolean (task, ret);
1991
1992 g_object_unref (task);
1993 }
1994
1995 /**
1996 * g_output_stream_close_async:
1997 * @stream: A #GOutputStream.
1998 * @io_priority: the io priority of the request.
1999 * @cancellable: (nullable): optional cancellable object
2000 * @callback: (scope async): callback to call when the request is satisfied
2001 * @user_data: (closure): the data to pass to callback function
2002 *
2003 * Requests an asynchronous close of the stream, releasing resources
2004 * related to it. When the operation is finished @callback will be
2005 * called. You can then call g_output_stream_close_finish() to get
2006 * the result of the operation.
2007 *
2008 * For behaviour details see g_output_stream_close().
2009 *
2010 * The asynchronous methods have a default fallback that uses threads
2011 * to implement asynchronicity, so they are optional for inheriting
2012 * classes. However, if you override one you must override all.
2013 **/
2014 void
g_output_stream_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2015 g_output_stream_close_async (GOutputStream *stream,
2016 int io_priority,
2017 GCancellable *cancellable,
2018 GAsyncReadyCallback callback,
2019 gpointer user_data)
2020 {
2021 GTask *task;
2022 GError *error = NULL;
2023
2024 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
2025
2026 task = g_task_new (stream, cancellable, callback, user_data);
2027 g_task_set_source_tag (task, g_output_stream_close_async);
2028 g_task_set_priority (task, io_priority);
2029
2030 if (!g_output_stream_set_pending (stream, &error))
2031 {
2032 g_task_return_error (task, error);
2033 g_object_unref (task);
2034 return;
2035 }
2036
2037 g_output_stream_internal_close_async (stream, io_priority, cancellable,
2038 real_close_async_cb, task);
2039 }
2040
2041 /* Must always be called inside
2042 * g_output_stream_set_pending()/g_output_stream_clear_pending().
2043 */
2044 void
g_output_stream_internal_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2045 g_output_stream_internal_close_async (GOutputStream *stream,
2046 int io_priority,
2047 GCancellable *cancellable,
2048 GAsyncReadyCallback callback,
2049 gpointer user_data)
2050 {
2051 GOutputStreamClass *class;
2052 GTask *task;
2053
2054 task = g_task_new (stream, cancellable, callback, user_data);
2055 g_task_set_source_tag (task, g_output_stream_internal_close_async);
2056 g_task_set_priority (task, io_priority);
2057
2058 if (stream->priv->closed)
2059 {
2060 g_task_return_boolean (task, TRUE);
2061 g_object_unref (task);
2062 return;
2063 }
2064
2065 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2066 stream->priv->closing = TRUE;
2067
2068 /* Call close_async directly if there is no need to flush, or if the flush
2069 can be done sync (in the output stream async close thread) */
2070 if (class->flush_async == NULL ||
2071 (class->flush_async == g_output_stream_real_flush_async &&
2072 (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
2073 {
2074 class->close_async (stream, io_priority, cancellable,
2075 async_ready_close_callback_wrapper, task);
2076 }
2077 else
2078 {
2079 /* First do an async flush, then do the async close in the callback
2080 wrapper (see async_ready_close_flushed_callback_wrapper) */
2081 class->flush_async (stream, io_priority, cancellable,
2082 async_ready_close_flushed_callback_wrapper, task);
2083 }
2084 }
2085
2086 static gboolean
g_output_stream_internal_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2087 g_output_stream_internal_close_finish (GOutputStream *stream,
2088 GAsyncResult *result,
2089 GError **error)
2090 {
2091 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2092 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2093 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
2094
2095 return g_task_propagate_boolean (G_TASK (result), error);
2096 }
2097
2098 /**
2099 * g_output_stream_close_finish:
2100 * @stream: a #GOutputStream.
2101 * @result: a #GAsyncResult.
2102 * @error: a #GError location to store the error occurring, or %NULL to
2103 * ignore.
2104 *
2105 * Closes an output stream.
2106 *
2107 * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
2108 **/
2109 gboolean
g_output_stream_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2110 g_output_stream_close_finish (GOutputStream *stream,
2111 GAsyncResult *result,
2112 GError **error)
2113 {
2114 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2115 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2116 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
2117
2118 /* @result is always the GTask created by g_output_stream_close_async();
2119 * we called class->close_finish() from async_ready_close_callback_wrapper.
2120 */
2121 return g_task_propagate_boolean (G_TASK (result), error);
2122 }
2123
2124 /**
2125 * g_output_stream_is_closed:
2126 * @stream: a #GOutputStream.
2127 *
2128 * Checks if an output stream has already been closed.
2129 *
2130 * Returns: %TRUE if @stream is closed. %FALSE otherwise.
2131 **/
2132 gboolean
g_output_stream_is_closed(GOutputStream * stream)2133 g_output_stream_is_closed (GOutputStream *stream)
2134 {
2135 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
2136
2137 return stream->priv->closed;
2138 }
2139
2140 /**
2141 * g_output_stream_is_closing:
2142 * @stream: a #GOutputStream.
2143 *
2144 * Checks if an output stream is being closed. This can be
2145 * used inside e.g. a flush implementation to see if the
2146 * flush (or other i/o operation) is called from within
2147 * the closing operation.
2148 *
2149 * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
2150 *
2151 * Since: 2.24
2152 **/
2153 gboolean
g_output_stream_is_closing(GOutputStream * stream)2154 g_output_stream_is_closing (GOutputStream *stream)
2155 {
2156 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
2157
2158 return stream->priv->closing;
2159 }
2160
2161 /**
2162 * g_output_stream_has_pending:
2163 * @stream: a #GOutputStream.
2164 *
2165 * Checks if an output stream has pending actions.
2166 *
2167 * Returns: %TRUE if @stream has pending actions.
2168 **/
2169 gboolean
g_output_stream_has_pending(GOutputStream * stream)2170 g_output_stream_has_pending (GOutputStream *stream)
2171 {
2172 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2173
2174 return stream->priv->pending;
2175 }
2176
2177 /**
2178 * g_output_stream_set_pending:
2179 * @stream: a #GOutputStream.
2180 * @error: a #GError location to store the error occurring, or %NULL to
2181 * ignore.
2182 *
2183 * Sets @stream to have actions pending. If the pending flag is
2184 * already set or @stream is closed, it will return %FALSE and set
2185 * @error.
2186 *
2187 * Returns: %TRUE if pending was previously unset and is now set.
2188 **/
2189 gboolean
g_output_stream_set_pending(GOutputStream * stream,GError ** error)2190 g_output_stream_set_pending (GOutputStream *stream,
2191 GError **error)
2192 {
2193 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2194
2195 if (stream->priv->closed)
2196 {
2197 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
2198 _("Stream is already closed"));
2199 return FALSE;
2200 }
2201
2202 if (stream->priv->pending)
2203 {
2204 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
2205 /* Translators: This is an error you get if there is
2206 * already an operation running against this stream when
2207 * you try to start one */
2208 _("Stream has outstanding operation"));
2209 return FALSE;
2210 }
2211
2212 stream->priv->pending = TRUE;
2213 return TRUE;
2214 }
2215
2216 /**
2217 * g_output_stream_clear_pending:
2218 * @stream: output stream
2219 *
2220 * Clears the pending flag on @stream.
2221 **/
2222 void
g_output_stream_clear_pending(GOutputStream * stream)2223 g_output_stream_clear_pending (GOutputStream *stream)
2224 {
2225 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
2226
2227 stream->priv->pending = FALSE;
2228 }
2229
2230 /*< internal >
2231 * g_output_stream_async_write_is_via_threads:
2232 * @stream: a #GOutputStream.
2233 *
2234 * Checks if an output stream's write_async function uses threads.
2235 *
2236 * Returns: %TRUE if @stream's write_async function uses threads.
2237 **/
2238 gboolean
g_output_stream_async_write_is_via_threads(GOutputStream * stream)2239 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
2240 {
2241 GOutputStreamClass *class;
2242
2243 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2244
2245 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2246
2247 return (class->write_async == g_output_stream_real_write_async &&
2248 !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
2249 g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
2250 }
2251
2252 /*< internal >
2253 * g_output_stream_async_writev_is_via_threads:
2254 * @stream: a #GOutputStream.
2255 *
2256 * Checks if an output stream's writev_async function uses threads.
2257 *
2258 * Returns: %TRUE if @stream's writev_async function uses threads.
2259 **/
2260 gboolean
g_output_stream_async_writev_is_via_threads(GOutputStream * stream)2261 g_output_stream_async_writev_is_via_threads (GOutputStream *stream)
2262 {
2263 GOutputStreamClass *class;
2264
2265 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2266
2267 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2268
2269 return (class->writev_async == g_output_stream_real_writev_async &&
2270 !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
2271 g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
2272 }
2273
2274 /*< internal >
2275 * g_output_stream_async_close_is_via_threads:
2276 * @stream: output stream
2277 *
2278 * Checks if an output stream's close_async function uses threads.
2279 *
2280 * Returns: %TRUE if @stream's close_async function uses threads.
2281 **/
2282 gboolean
g_output_stream_async_close_is_via_threads(GOutputStream * stream)2283 g_output_stream_async_close_is_via_threads (GOutputStream *stream)
2284 {
2285 GOutputStreamClass *class;
2286
2287 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2288
2289 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2290
2291 return class->close_async == g_output_stream_real_close_async;
2292 }
2293
2294 /********************************************
2295 * Default implementation of sync ops *
2296 ********************************************/
2297 static gboolean
g_output_stream_real_writev(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,gsize * bytes_written,GCancellable * cancellable,GError ** error)2298 g_output_stream_real_writev (GOutputStream *stream,
2299 const GOutputVector *vectors,
2300 gsize n_vectors,
2301 gsize *bytes_written,
2302 GCancellable *cancellable,
2303 GError **error)
2304 {
2305 GOutputStreamClass *class;
2306 gsize _bytes_written = 0;
2307 gsize i;
2308 GError *err = NULL;
2309
2310 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2311
2312 if (bytes_written)
2313 *bytes_written = 0;
2314
2315 for (i = 0; i < n_vectors; i++)
2316 {
2317 gssize res = 0;
2318
2319 /* Would we overflow here? In that case simply return and let the caller
2320 * handle this like a short write */
2321 if (_bytes_written > G_MAXSIZE - vectors[i].size)
2322 break;
2323
2324 res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err);
2325
2326 if (res == -1)
2327 {
2328 /* If we already wrote something we handle this like a short write
2329 * and assume that on the next call the same error happens again, or
2330 * everything finishes successfully without data loss then
2331 */
2332 if (_bytes_written > 0)
2333 {
2334 if (bytes_written)
2335 *bytes_written = _bytes_written;
2336
2337 g_clear_error (&err);
2338 return TRUE;
2339 }
2340
2341 g_propagate_error (error, err);
2342 return FALSE;
2343 }
2344
2345 _bytes_written += res;
2346 /* if we had a short write break the loop here */
2347 if ((gsize) res < vectors[i].size)
2348 break;
2349 }
2350
2351 if (bytes_written)
2352 *bytes_written = _bytes_written;
2353
2354 return TRUE;
2355 }
2356
2357 /********************************************
2358 * Default implementation of async ops *
2359 ********************************************/
2360
2361 typedef struct {
2362 const void *buffer;
2363 gsize count_requested;
2364 gssize count_written;
2365 } WriteData;
2366
2367 static void
free_write_data(WriteData * op)2368 free_write_data (WriteData *op)
2369 {
2370 g_slice_free (WriteData, op);
2371 }
2372
2373 static void
write_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2374 write_async_thread (GTask *task,
2375 gpointer source_object,
2376 gpointer task_data,
2377 GCancellable *cancellable)
2378 {
2379 GOutputStream *stream = source_object;
2380 WriteData *op = task_data;
2381 GOutputStreamClass *class;
2382 GError *error = NULL;
2383 gssize count_written;
2384
2385 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2386 count_written = class->write_fn (stream, op->buffer, op->count_requested,
2387 cancellable, &error);
2388 if (count_written == -1)
2389 g_task_return_error (task, error);
2390 else
2391 g_task_return_int (task, count_written);
2392 }
2393
2394 static void write_async_pollable (GPollableOutputStream *stream,
2395 GTask *task);
2396
2397 static gboolean
write_async_pollable_ready(GPollableOutputStream * stream,gpointer user_data)2398 write_async_pollable_ready (GPollableOutputStream *stream,
2399 gpointer user_data)
2400 {
2401 GTask *task = user_data;
2402
2403 write_async_pollable (stream, task);
2404 return FALSE;
2405 }
2406
2407 static void
write_async_pollable(GPollableOutputStream * stream,GTask * task)2408 write_async_pollable (GPollableOutputStream *stream,
2409 GTask *task)
2410 {
2411 GError *error = NULL;
2412 WriteData *op = g_task_get_task_data (task);
2413 gssize count_written;
2414
2415 if (g_task_return_error_if_cancelled (task))
2416 return;
2417
2418 count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
2419 write_nonblocking (stream, op->buffer, op->count_requested, &error);
2420
2421 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
2422 {
2423 GSource *source;
2424
2425 g_error_free (error);
2426
2427 source = g_pollable_output_stream_create_source (stream,
2428 g_task_get_cancellable (task));
2429 g_task_attach_source (task, source,
2430 (GSourceFunc) write_async_pollable_ready);
2431 g_source_unref (source);
2432 return;
2433 }
2434
2435 if (count_written == -1)
2436 g_task_return_error (task, error);
2437 else
2438 g_task_return_int (task, count_written);
2439 }
2440
2441 static void
g_output_stream_real_write_async(GOutputStream * stream,const void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2442 g_output_stream_real_write_async (GOutputStream *stream,
2443 const void *buffer,
2444 gsize count,
2445 int io_priority,
2446 GCancellable *cancellable,
2447 GAsyncReadyCallback callback,
2448 gpointer user_data)
2449 {
2450 GTask *task;
2451 WriteData *op;
2452
2453 op = g_slice_new0 (WriteData);
2454 task = g_task_new (stream, cancellable, callback, user_data);
2455 g_task_set_check_cancellable (task, FALSE);
2456 g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
2457 op->buffer = buffer;
2458 op->count_requested = count;
2459
2460 if (!g_output_stream_async_write_is_via_threads (stream))
2461 write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
2462 else
2463 g_task_run_in_thread (task, write_async_thread);
2464 g_object_unref (task);
2465 }
2466
2467 static gssize
g_output_stream_real_write_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2468 g_output_stream_real_write_finish (GOutputStream *stream,
2469 GAsyncResult *result,
2470 GError **error)
2471 {
2472 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2473
2474 return g_task_propagate_int (G_TASK (result), error);
2475 }
2476
2477 typedef struct {
2478 const GOutputVector *vectors;
2479 gsize n_vectors; /* (unowned) */
2480 gsize bytes_written;
2481 } WritevData;
2482
2483 static void
free_writev_data(WritevData * op)2484 free_writev_data (WritevData *op)
2485 {
2486 g_slice_free (WritevData, op);
2487 }
2488
2489 static void
writev_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2490 writev_async_thread (GTask *task,
2491 gpointer source_object,
2492 gpointer task_data,
2493 GCancellable *cancellable)
2494 {
2495 GOutputStream *stream = source_object;
2496 WritevData *op = task_data;
2497 GOutputStreamClass *class;
2498 GError *error = NULL;
2499 gboolean res;
2500
2501 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2502 res = class->writev_fn (stream, op->vectors, op->n_vectors,
2503 &op->bytes_written, cancellable, &error);
2504
2505 g_warn_if_fail (res || op->bytes_written == 0);
2506 g_warn_if_fail (res || error != NULL);
2507
2508 if (!res)
2509 g_task_return_error (task, g_steal_pointer (&error));
2510 else
2511 g_task_return_boolean (task, TRUE);
2512 }
2513
2514 static void writev_async_pollable (GPollableOutputStream *stream,
2515 GTask *task);
2516
2517 static gboolean
writev_async_pollable_ready(GPollableOutputStream * stream,gpointer user_data)2518 writev_async_pollable_ready (GPollableOutputStream *stream,
2519 gpointer user_data)
2520 {
2521 GTask *task = user_data;
2522
2523 writev_async_pollable (stream, task);
2524 return G_SOURCE_REMOVE;
2525 }
2526
2527 static void
writev_async_pollable(GPollableOutputStream * stream,GTask * task)2528 writev_async_pollable (GPollableOutputStream *stream,
2529 GTask *task)
2530 {
2531 GError *error = NULL;
2532 WritevData *op = g_task_get_task_data (task);
2533 GPollableReturn res;
2534 gsize bytes_written = 0;
2535
2536 if (g_task_return_error_if_cancelled (task))
2537 return;
2538
2539 res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
2540 writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error);
2541
2542 switch (res)
2543 {
2544 case G_POLLABLE_RETURN_WOULD_BLOCK:
2545 {
2546 GSource *source;
2547
2548 g_warn_if_fail (error == NULL);
2549 g_warn_if_fail (bytes_written == 0);
2550
2551 source = g_pollable_output_stream_create_source (stream,
2552 g_task_get_cancellable (task));
2553 g_task_attach_source (task, source,
2554 (GSourceFunc) writev_async_pollable_ready);
2555 g_source_unref (source);
2556 }
2557 break;
2558 case G_POLLABLE_RETURN_OK:
2559 g_warn_if_fail (error == NULL);
2560 op->bytes_written = bytes_written;
2561 g_task_return_boolean (task, TRUE);
2562 break;
2563 case G_POLLABLE_RETURN_FAILED:
2564 g_warn_if_fail (bytes_written == 0);
2565 g_warn_if_fail (error != NULL);
2566 g_task_return_error (task, g_steal_pointer (&error));
2567 break;
2568 default:
2569 g_assert_not_reached ();
2570 }
2571 }
2572
2573 static void
g_output_stream_real_writev_async(GOutputStream * stream,const GOutputVector * vectors,gsize n_vectors,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2574 g_output_stream_real_writev_async (GOutputStream *stream,
2575 const GOutputVector *vectors,
2576 gsize n_vectors,
2577 int io_priority,
2578 GCancellable *cancellable,
2579 GAsyncReadyCallback callback,
2580 gpointer user_data)
2581 {
2582 GTask *task;
2583 WritevData *op;
2584 GError *error = NULL;
2585
2586 op = g_slice_new0 (WritevData);
2587 task = g_task_new (stream, cancellable, callback, user_data);
2588 op->vectors = vectors;
2589 op->n_vectors = n_vectors;
2590
2591 g_task_set_check_cancellable (task, FALSE);
2592 g_task_set_source_tag (task, g_output_stream_writev_async);
2593 g_task_set_priority (task, io_priority);
2594 g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data);
2595
2596 if (n_vectors == 0)
2597 {
2598 g_task_return_boolean (task, TRUE);
2599 g_object_unref (task);
2600 return;
2601 }
2602
2603 if (!g_output_stream_set_pending (stream, &error))
2604 {
2605 g_task_return_error (task, g_steal_pointer (&error));
2606 g_object_unref (task);
2607 return;
2608 }
2609
2610 if (!g_output_stream_async_writev_is_via_threads (stream))
2611 writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
2612 else
2613 g_task_run_in_thread (task, writev_async_thread);
2614
2615 g_object_unref (task);
2616 }
2617
2618 static gboolean
g_output_stream_real_writev_finish(GOutputStream * stream,GAsyncResult * result,gsize * bytes_written,GError ** error)2619 g_output_stream_real_writev_finish (GOutputStream *stream,
2620 GAsyncResult *result,
2621 gsize *bytes_written,
2622 GError **error)
2623 {
2624 GTask *task;
2625
2626 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2627 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE);
2628
2629 g_output_stream_clear_pending (stream);
2630
2631 task = G_TASK (result);
2632
2633 if (bytes_written)
2634 {
2635 WritevData *op = g_task_get_task_data (task);
2636
2637 *bytes_written = op->bytes_written;
2638 }
2639
2640 return g_task_propagate_boolean (task, error);
2641 }
2642
2643 typedef struct {
2644 GInputStream *source;
2645 GOutputStreamSpliceFlags flags;
2646 guint istream_closed : 1;
2647 guint ostream_closed : 1;
2648 gssize n_read;
2649 gssize n_written;
2650 gsize bytes_copied;
2651 GError *error;
2652 guint8 *buffer;
2653 } SpliceData;
2654
2655 static void
free_splice_data(SpliceData * op)2656 free_splice_data (SpliceData *op)
2657 {
2658 g_clear_pointer (&op->buffer, g_free);
2659 g_object_unref (op->source);
2660 g_clear_error (&op->error);
2661 g_free (op);
2662 }
2663
2664 static void
real_splice_async_complete_cb(GTask * task)2665 real_splice_async_complete_cb (GTask *task)
2666 {
2667 SpliceData *op = g_task_get_task_data (task);
2668
2669 if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
2670 !op->istream_closed)
2671 return;
2672
2673 if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
2674 !op->ostream_closed)
2675 return;
2676
2677 if (op->error != NULL)
2678 {
2679 g_task_return_error (task, op->error);
2680 op->error = NULL;
2681 }
2682 else
2683 {
2684 g_task_return_int (task, op->bytes_copied);
2685 }
2686
2687 g_object_unref (task);
2688 }
2689
2690 static void
real_splice_async_close_input_cb(GObject * source,GAsyncResult * res,gpointer user_data)2691 real_splice_async_close_input_cb (GObject *source,
2692 GAsyncResult *res,
2693 gpointer user_data)
2694 {
2695 GTask *task = user_data;
2696 SpliceData *op = g_task_get_task_data (task);
2697
2698 g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
2699 op->istream_closed = TRUE;
2700
2701 real_splice_async_complete_cb (task);
2702 }
2703
2704 static void
real_splice_async_close_output_cb(GObject * source,GAsyncResult * res,gpointer user_data)2705 real_splice_async_close_output_cb (GObject *source,
2706 GAsyncResult *res,
2707 gpointer user_data)
2708 {
2709 GTask *task = G_TASK (user_data);
2710 SpliceData *op = g_task_get_task_data (task);
2711 GError **error = (op->error == NULL) ? &op->error : NULL;
2712
2713 g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
2714 op->ostream_closed = TRUE;
2715
2716 real_splice_async_complete_cb (task);
2717 }
2718
2719 static void
real_splice_async_complete(GTask * task)2720 real_splice_async_complete (GTask *task)
2721 {
2722 SpliceData *op = g_task_get_task_data (task);
2723 gboolean done = TRUE;
2724
2725 if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
2726 {
2727 done = FALSE;
2728 g_input_stream_close_async (op->source, g_task_get_priority (task),
2729 g_task_get_cancellable (task),
2730 real_splice_async_close_input_cb, task);
2731 }
2732
2733 if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
2734 {
2735 done = FALSE;
2736 g_output_stream_internal_close_async (g_task_get_source_object (task),
2737 g_task_get_priority (task),
2738 g_task_get_cancellable (task),
2739 real_splice_async_close_output_cb,
2740 task);
2741 }
2742
2743 if (done)
2744 real_splice_async_complete_cb (task);
2745 }
2746
2747 static void real_splice_async_read_cb (GObject *source,
2748 GAsyncResult *res,
2749 gpointer user_data);
2750
2751 static void
real_splice_async_write_cb(GObject * source,GAsyncResult * res,gpointer user_data)2752 real_splice_async_write_cb (GObject *source,
2753 GAsyncResult *res,
2754 gpointer user_data)
2755 {
2756 GOutputStreamClass *class;
2757 GTask *task = G_TASK (user_data);
2758 SpliceData *op = g_task_get_task_data (task);
2759 gssize ret;
2760
2761 class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
2762
2763 ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
2764
2765 if (ret == -1)
2766 {
2767 real_splice_async_complete (task);
2768 return;
2769 }
2770
2771 op->n_written += ret;
2772 op->bytes_copied += ret;
2773 if (op->bytes_copied > G_MAXSSIZE)
2774 op->bytes_copied = G_MAXSSIZE;
2775
2776 if (op->n_written < op->n_read)
2777 {
2778 class->write_async (g_task_get_source_object (task),
2779 op->buffer + op->n_written,
2780 op->n_read - op->n_written,
2781 g_task_get_priority (task),
2782 g_task_get_cancellable (task),
2783 real_splice_async_write_cb, task);
2784 return;
2785 }
2786
2787 g_input_stream_read_async (op->source, op->buffer, 8192,
2788 g_task_get_priority (task),
2789 g_task_get_cancellable (task),
2790 real_splice_async_read_cb, task);
2791 }
2792
2793 static void
real_splice_async_read_cb(GObject * source,GAsyncResult * res,gpointer user_data)2794 real_splice_async_read_cb (GObject *source,
2795 GAsyncResult *res,
2796 gpointer user_data)
2797 {
2798 GOutputStreamClass *class;
2799 GTask *task = G_TASK (user_data);
2800 SpliceData *op = g_task_get_task_data (task);
2801 gssize ret;
2802
2803 class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
2804
2805 ret = g_input_stream_read_finish (op->source, res, &op->error);
2806 if (ret == -1 || ret == 0)
2807 {
2808 real_splice_async_complete (task);
2809 return;
2810 }
2811
2812 op->n_read = ret;
2813 op->n_written = 0;
2814
2815 class->write_async (g_task_get_source_object (task), op->buffer,
2816 op->n_read, g_task_get_priority (task),
2817 g_task_get_cancellable (task),
2818 real_splice_async_write_cb, task);
2819 }
2820
2821 static void
splice_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2822 splice_async_thread (GTask *task,
2823 gpointer source_object,
2824 gpointer task_data,
2825 GCancellable *cancellable)
2826 {
2827 GOutputStream *stream = source_object;
2828 SpliceData *op = task_data;
2829 GOutputStreamClass *class;
2830 GError *error = NULL;
2831 gssize bytes_copied;
2832
2833 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2834
2835 bytes_copied = class->splice (stream,
2836 op->source,
2837 op->flags,
2838 cancellable,
2839 &error);
2840 if (bytes_copied == -1)
2841 g_task_return_error (task, error);
2842 else
2843 g_task_return_int (task, bytes_copied);
2844 }
2845
2846 static void
g_output_stream_real_splice_async(GOutputStream * stream,GInputStream * source,GOutputStreamSpliceFlags flags,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2847 g_output_stream_real_splice_async (GOutputStream *stream,
2848 GInputStream *source,
2849 GOutputStreamSpliceFlags flags,
2850 int io_priority,
2851 GCancellable *cancellable,
2852 GAsyncReadyCallback callback,
2853 gpointer user_data)
2854 {
2855 GTask *task;
2856 SpliceData *op;
2857
2858 op = g_new0 (SpliceData, 1);
2859 task = g_task_new (stream, cancellable, callback, user_data);
2860 g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
2861 op->flags = flags;
2862 op->source = g_object_ref (source);
2863
2864 if (g_input_stream_async_read_is_via_threads (source) &&
2865 g_output_stream_async_write_is_via_threads (stream))
2866 {
2867 g_task_run_in_thread (task, splice_async_thread);
2868 g_object_unref (task);
2869 }
2870 else
2871 {
2872 op->buffer = g_malloc (8192);
2873 g_input_stream_read_async (op->source, op->buffer, 8192,
2874 g_task_get_priority (task),
2875 g_task_get_cancellable (task),
2876 real_splice_async_read_cb, task);
2877 }
2878 }
2879
2880 static gssize
g_output_stream_real_splice_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2881 g_output_stream_real_splice_finish (GOutputStream *stream,
2882 GAsyncResult *result,
2883 GError **error)
2884 {
2885 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2886
2887 return g_task_propagate_int (G_TASK (result), error);
2888 }
2889
2890
2891 static void
flush_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2892 flush_async_thread (GTask *task,
2893 gpointer source_object,
2894 gpointer task_data,
2895 GCancellable *cancellable)
2896 {
2897 GOutputStream *stream = source_object;
2898 GOutputStreamClass *class;
2899 gboolean result;
2900 GError *error = NULL;
2901
2902 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2903 result = TRUE;
2904 if (class->flush)
2905 result = class->flush (stream, cancellable, &error);
2906
2907 if (result)
2908 g_task_return_boolean (task, TRUE);
2909 else
2910 g_task_return_error (task, error);
2911 }
2912
2913 static void
g_output_stream_real_flush_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2914 g_output_stream_real_flush_async (GOutputStream *stream,
2915 int io_priority,
2916 GCancellable *cancellable,
2917 GAsyncReadyCallback callback,
2918 gpointer user_data)
2919 {
2920 GTask *task;
2921
2922 task = g_task_new (stream, cancellable, callback, user_data);
2923 g_task_set_priority (task, io_priority);
2924 g_task_run_in_thread (task, flush_async_thread);
2925 g_object_unref (task);
2926 }
2927
2928 static gboolean
g_output_stream_real_flush_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2929 g_output_stream_real_flush_finish (GOutputStream *stream,
2930 GAsyncResult *result,
2931 GError **error)
2932 {
2933 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2934
2935 return g_task_propagate_boolean (G_TASK (result), error);
2936 }
2937
2938 static void
close_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)2939 close_async_thread (GTask *task,
2940 gpointer source_object,
2941 gpointer task_data,
2942 GCancellable *cancellable)
2943 {
2944 GOutputStream *stream = source_object;
2945 GOutputStreamClass *class;
2946 GError *error = NULL;
2947 gboolean result = TRUE;
2948
2949 class = G_OUTPUT_STREAM_GET_CLASS (stream);
2950
2951 /* Do a flush here if there is a flush function, and we did not have to do
2952 * an async flush before (see g_output_stream_close_async)
2953 */
2954 if (class->flush != NULL &&
2955 (class->flush_async == NULL ||
2956 class->flush_async == g_output_stream_real_flush_async))
2957 {
2958 result = class->flush (stream, cancellable, &error);
2959 }
2960
2961 /* Auto handling of cancellation disabled, and ignore
2962 cancellation, since we want to close things anyway, although
2963 possibly in a quick-n-dirty way. At least we never want to leak
2964 open handles */
2965
2966 if (class->close_fn)
2967 {
2968 /* Make sure to close, even if the flush failed (see sync close) */
2969 if (!result)
2970 class->close_fn (stream, cancellable, NULL);
2971 else
2972 result = class->close_fn (stream, cancellable, &error);
2973 }
2974
2975 if (result)
2976 g_task_return_boolean (task, TRUE);
2977 else
2978 g_task_return_error (task, error);
2979 }
2980
2981 static void
g_output_stream_real_close_async(GOutputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)2982 g_output_stream_real_close_async (GOutputStream *stream,
2983 int io_priority,
2984 GCancellable *cancellable,
2985 GAsyncReadyCallback callback,
2986 gpointer user_data)
2987 {
2988 GTask *task;
2989
2990 task = g_task_new (stream, cancellable, callback, user_data);
2991 g_task_set_priority (task, io_priority);
2992 g_task_run_in_thread (task, close_async_thread);
2993 g_object_unref (task);
2994 }
2995
2996 static gboolean
g_output_stream_real_close_finish(GOutputStream * stream,GAsyncResult * result,GError ** error)2997 g_output_stream_real_close_finish (GOutputStream *stream,
2998 GAsyncResult *result,
2999 GError **error)
3000 {
3001 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
3002
3003 return g_task_propagate_boolean (G_TASK (result), error);
3004 }
3005