1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 *
18 * Author: Alexander Larsson <alexl@redhat.com>
19 */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34 * SECTION:ginputstream
35 * @short_description: Base class for implementing streaming input
36 * @include: gio/gio.h
37 *
38 * #GInputStream has functions to read from a stream (g_input_stream_read()),
39 * to close a stream (g_input_stream_close()) and to skip some content
40 * (g_input_stream_skip()).
41 *
42 * To copy the content of an input stream to an output stream without
43 * manually handling the reads and writes, use g_output_stream_splice().
44 *
45 * See the documentation for #GIOStream for details of thread safety of
46 * streaming APIs.
47 *
48 * All of these functions have async variants too.
49 **/
50
51 struct _GInputStreamPrivate {
52 guint closed : 1;
53 guint pending : 1;
54 GAsyncReadyCallback outstanding_callback;
55 };
56
57 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
58
59 static gssize g_input_stream_real_skip (GInputStream *stream,
60 gsize count,
61 GCancellable *cancellable,
62 GError **error);
63 static void g_input_stream_real_read_async (GInputStream *stream,
64 void *buffer,
65 gsize count,
66 int io_priority,
67 GCancellable *cancellable,
68 GAsyncReadyCallback callback,
69 gpointer user_data);
70 static gssize g_input_stream_real_read_finish (GInputStream *stream,
71 GAsyncResult *result,
72 GError **error);
73 static void g_input_stream_real_skip_async (GInputStream *stream,
74 gsize count,
75 int io_priority,
76 GCancellable *cancellable,
77 GAsyncReadyCallback callback,
78 gpointer data);
79 static gssize g_input_stream_real_skip_finish (GInputStream *stream,
80 GAsyncResult *result,
81 GError **error);
82 static void g_input_stream_real_close_async (GInputStream *stream,
83 int io_priority,
84 GCancellable *cancellable,
85 GAsyncReadyCallback callback,
86 gpointer data);
87 static gboolean g_input_stream_real_close_finish (GInputStream *stream,
88 GAsyncResult *result,
89 GError **error);
90
91 static void
g_input_stream_dispose(GObject * object)92 g_input_stream_dispose (GObject *object)
93 {
94 GInputStream *stream;
95
96 stream = G_INPUT_STREAM (object);
97
98 if (!stream->priv->closed)
99 g_input_stream_close (stream, NULL, NULL);
100
101 G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
102 }
103
104
105 static void
g_input_stream_class_init(GInputStreamClass * klass)106 g_input_stream_class_init (GInputStreamClass *klass)
107 {
108 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109
110 gobject_class->dispose = g_input_stream_dispose;
111
112 klass->skip = g_input_stream_real_skip;
113 klass->read_async = g_input_stream_real_read_async;
114 klass->read_finish = g_input_stream_real_read_finish;
115 klass->skip_async = g_input_stream_real_skip_async;
116 klass->skip_finish = g_input_stream_real_skip_finish;
117 klass->close_async = g_input_stream_real_close_async;
118 klass->close_finish = g_input_stream_real_close_finish;
119 }
120
121 static void
g_input_stream_init(GInputStream * stream)122 g_input_stream_init (GInputStream *stream)
123 {
124 stream->priv = g_input_stream_get_instance_private (stream);
125 }
126
127 /**
128 * g_input_stream_read:
129 * @stream: a #GInputStream.
130 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
131 * a buffer to read data into (which should be at least count bytes long).
132 * @count: the number of bytes that will be read from the stream
133 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
134 * @error: location to store the error occurring, or %NULL to ignore
135 *
136 * Tries to read @count bytes from the stream into the buffer starting at
137 * @buffer. Will block during this read.
138 *
139 * If count is zero returns zero and does nothing. A value of @count
140 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
141 *
142 * On success, the number of bytes read into the buffer is returned.
143 * It is not an error if this is not the same as the requested size, as it
144 * can happen e.g. near the end of a file. Zero is returned on end of file
145 * (or if @count is zero), but never otherwise.
146 *
147 * The returned @buffer is not a nul-terminated string, it can contain nul bytes
148 * at any position, and this function doesn't nul-terminate the @buffer.
149 *
150 * If @cancellable is not %NULL, then the operation can be cancelled by
151 * triggering the cancellable object from another thread. If the operation
152 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
153 * operation was partially finished when the operation was cancelled the
154 * partial result will be returned, without an error.
155 *
156 * On error -1 is returned and @error is set accordingly.
157 *
158 * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
159 **/
160 gssize
g_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)161 g_input_stream_read (GInputStream *stream,
162 void *buffer,
163 gsize count,
164 GCancellable *cancellable,
165 GError **error)
166 {
167 GInputStreamClass *class;
168 gssize res;
169
170 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
171 g_return_val_if_fail (buffer != NULL, 0);
172
173 if (count == 0)
174 return 0;
175
176 if (((gssize) count) < 0)
177 {
178 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
179 _("Too large count value passed to %s"), G_STRFUNC);
180 return -1;
181 }
182
183 class = G_INPUT_STREAM_GET_CLASS (stream);
184
185 if (class->read_fn == NULL)
186 {
187 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
188 _("Input stream doesn’t implement read"));
189 return -1;
190 }
191
192 if (!g_input_stream_set_pending (stream, error))
193 return -1;
194
195 if (cancellable)
196 g_cancellable_push_current (cancellable);
197
198 res = class->read_fn (stream, buffer, count, cancellable, error);
199
200 if (cancellable)
201 g_cancellable_pop_current (cancellable);
202
203 g_input_stream_clear_pending (stream);
204
205 return res;
206 }
207
208 /**
209 * g_input_stream_read_all:
210 * @stream: a #GInputStream.
211 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
212 * a buffer to read data into (which should be at least count bytes long).
213 * @count: the number of bytes that will be read from the stream
214 * @bytes_read: (out): location to store the number of bytes that was read from the stream
215 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
216 * @error: location to store the error occurring, or %NULL to ignore
217 *
218 * Tries to read @count bytes from the stream into the buffer starting at
219 * @buffer. Will block during this read.
220 *
221 * This function is similar to g_input_stream_read(), except it tries to
222 * read as many bytes as requested, only stopping on an error or end of stream.
223 *
224 * On a successful read of @count bytes, or if we reached the end of the
225 * stream, %TRUE is returned, and @bytes_read is set to the number of bytes
226 * read into @buffer.
227 *
228 * If there is an error during the operation %FALSE is returned and @error
229 * is set to indicate the error status.
230 *
231 * As a special exception to the normal conventions for functions that
232 * use #GError, if this function returns %FALSE (and sets @error) then
233 * @bytes_read will be set to the number of bytes that were successfully
234 * read before the error was encountered. This functionality is only
235 * available from C. If you need it from another language then you must
236 * write your own loop around g_input_stream_read().
237 *
238 * Returns: %TRUE on success, %FALSE if there was an error
239 **/
240 gboolean
g_input_stream_read_all(GInputStream * stream,void * buffer,gsize count,gsize * bytes_read,GCancellable * cancellable,GError ** error)241 g_input_stream_read_all (GInputStream *stream,
242 void *buffer,
243 gsize count,
244 gsize *bytes_read,
245 GCancellable *cancellable,
246 GError **error)
247 {
248 gsize _bytes_read;
249 gssize res;
250
251 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252 g_return_val_if_fail (buffer != NULL, FALSE);
253
254 _bytes_read = 0;
255 while (_bytes_read < count)
256 {
257 res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258 cancellable, error);
259 if (res == -1)
260 {
261 if (bytes_read)
262 *bytes_read = _bytes_read;
263 return FALSE;
264 }
265
266 if (res == 0)
267 break;
268
269 _bytes_read += res;
270 }
271
272 if (bytes_read)
273 *bytes_read = _bytes_read;
274 return TRUE;
275 }
276
277 /**
278 * g_input_stream_read_bytes:
279 * @stream: a #GInputStream.
280 * @count: maximum number of bytes that will be read from the stream. Common
281 * values include 4096 and 8192.
282 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
283 * @error: location to store the error occurring, or %NULL to ignore
284 *
285 * Like g_input_stream_read(), this tries to read @count bytes from
286 * the stream in a blocking fashion. However, rather than reading into
287 * a user-supplied buffer, this will create a new #GBytes containing
288 * the data that was read. This may be easier to use from language
289 * bindings.
290 *
291 * If count is zero, returns a zero-length #GBytes and does nothing. A
292 * value of @count larger than %G_MAXSSIZE will cause a
293 * %G_IO_ERROR_INVALID_ARGUMENT error.
294 *
295 * On success, a new #GBytes is returned. It is not an error if the
296 * size of this object is not the same as the requested size, as it
297 * can happen e.g. near the end of a file. A zero-length #GBytes is
298 * returned on end of file (or if @count is zero), but never
299 * otherwise.
300 *
301 * If @cancellable is not %NULL, then the operation can be cancelled by
302 * triggering the cancellable object from another thread. If the operation
303 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304 * operation was partially finished when the operation was cancelled the
305 * partial result will be returned, without an error.
306 *
307 * On error %NULL is returned and @error is set accordingly.
308 *
309 * Returns: (transfer full): a new #GBytes, or %NULL on error
310 *
311 * Since: 2.34
312 **/
313 GBytes *
g_input_stream_read_bytes(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)314 g_input_stream_read_bytes (GInputStream *stream,
315 gsize count,
316 GCancellable *cancellable,
317 GError **error)
318 {
319 guchar *buf;
320 gssize nread;
321
322 buf = g_malloc (count);
323 nread = g_input_stream_read (stream, buf, count, cancellable, error);
324 if (nread == -1)
325 {
326 g_free (buf);
327 return NULL;
328 }
329 else if (nread == 0)
330 {
331 g_free (buf);
332 return g_bytes_new_static ("", 0);
333 }
334 else
335 return g_bytes_new_take (buf, nread);
336 }
337
338 /**
339 * g_input_stream_skip:
340 * @stream: a #GInputStream.
341 * @count: the number of bytes that will be skipped from the stream
342 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
343 * @error: location to store the error occurring, or %NULL to ignore
344 *
345 * Tries to skip @count bytes from the stream. Will block during the operation.
346 *
347 * This is identical to g_input_stream_read(), from a behaviour standpoint,
348 * but the bytes that are skipped are not returned to the user. Some
349 * streams have an implementation that is more efficient than reading the data.
350 *
351 * This function is optional for inherited classes, as the default implementation
352 * emulates it using read.
353 *
354 * If @cancellable is not %NULL, then the operation can be cancelled by
355 * triggering the cancellable object from another thread. If the operation
356 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
357 * operation was partially finished when the operation was cancelled the
358 * partial result will be returned, without an error.
359 *
360 * Returns: Number of bytes skipped, or -1 on error
361 **/
362 gssize
g_input_stream_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)363 g_input_stream_skip (GInputStream *stream,
364 gsize count,
365 GCancellable *cancellable,
366 GError **error)
367 {
368 GInputStreamClass *class;
369 gssize res;
370
371 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
372
373 if (count == 0)
374 return 0;
375
376 if (((gssize) count) < 0)
377 {
378 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
379 _("Too large count value passed to %s"), G_STRFUNC);
380 return -1;
381 }
382
383 class = G_INPUT_STREAM_GET_CLASS (stream);
384
385 if (!g_input_stream_set_pending (stream, error))
386 return -1;
387
388 if (cancellable)
389 g_cancellable_push_current (cancellable);
390
391 res = class->skip (stream, count, cancellable, error);
392
393 if (cancellable)
394 g_cancellable_pop_current (cancellable);
395
396 g_input_stream_clear_pending (stream);
397
398 return res;
399 }
400
401 static gssize
g_input_stream_real_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)402 g_input_stream_real_skip (GInputStream *stream,
403 gsize count,
404 GCancellable *cancellable,
405 GError **error)
406 {
407 GInputStreamClass *class;
408 gssize ret, read_bytes;
409 char buffer[8192];
410 GError *my_error;
411
412 if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
413 {
414 GSeekable *seekable = G_SEEKABLE (stream);
415 goffset start, end;
416 gboolean success;
417
418 /* g_seekable_seek() may try to set pending itself */
419 stream->priv->pending = FALSE;
420
421 start = g_seekable_tell (seekable);
422
423 if (g_seekable_seek (G_SEEKABLE (stream),
424 0,
425 G_SEEK_END,
426 cancellable,
427 NULL))
428 {
429 end = g_seekable_tell (seekable);
430 g_assert (end >= start);
431 if (start > G_MAXSIZE - count || start + count > end)
432 {
433 stream->priv->pending = TRUE;
434 return end - start;
435 }
436
437 success = g_seekable_seek (G_SEEKABLE (stream),
438 start + count,
439 G_SEEK_SET,
440 cancellable,
441 error);
442 stream->priv->pending = TRUE;
443
444 if (success)
445 return count;
446 else
447 return -1;
448 }
449 }
450
451 /* If not seekable, or seek failed, fall back to reading data: */
452
453 class = G_INPUT_STREAM_GET_CLASS (stream);
454
455 read_bytes = 0;
456 while (1)
457 {
458 my_error = NULL;
459
460 ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
461 cancellable, &my_error);
462 if (ret == -1)
463 {
464 if (read_bytes > 0 &&
465 my_error->domain == G_IO_ERROR &&
466 my_error->code == G_IO_ERROR_CANCELLED)
467 {
468 g_error_free (my_error);
469 return read_bytes;
470 }
471
472 g_propagate_error (error, my_error);
473 return -1;
474 }
475
476 count -= ret;
477 read_bytes += ret;
478
479 if (ret == 0 || count == 0)
480 return read_bytes;
481 }
482 }
483
484 /**
485 * g_input_stream_close:
486 * @stream: A #GInputStream.
487 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
488 * @error: location to store the error occurring, or %NULL to ignore
489 *
490 * Closes the stream, releasing resources related to it.
491 *
492 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
493 * Closing a stream multiple times will not return an error.
494 *
495 * Streams will be automatically closed when the last reference
496 * is dropped, but you might want to call this function to make sure
497 * resources are released as early as possible.
498 *
499 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
500 * open after the stream is closed. See the documentation for the individual
501 * stream for details.
502 *
503 * On failure the first error that happened will be reported, but the close
504 * operation will finish as much as possible. A stream that failed to
505 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
506 * is important to check and report the error to the user.
507 *
508 * If @cancellable is not %NULL, then the operation can be cancelled by
509 * triggering the cancellable object from another thread. If the operation
510 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
511 * Cancelling a close will still leave the stream closed, but some streams
512 * can use a faster close that doesn't block to e.g. check errors.
513 *
514 * Returns: %TRUE on success, %FALSE on failure
515 **/
516 gboolean
g_input_stream_close(GInputStream * stream,GCancellable * cancellable,GError ** error)517 g_input_stream_close (GInputStream *stream,
518 GCancellable *cancellable,
519 GError **error)
520 {
521 GInputStreamClass *class;
522 gboolean res;
523
524 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
525
526 class = G_INPUT_STREAM_GET_CLASS (stream);
527
528 if (stream->priv->closed)
529 return TRUE;
530
531 res = TRUE;
532
533 if (!g_input_stream_set_pending (stream, error))
534 return FALSE;
535
536 if (cancellable)
537 g_cancellable_push_current (cancellable);
538
539 if (class->close_fn)
540 res = class->close_fn (stream, cancellable, error);
541
542 if (cancellable)
543 g_cancellable_pop_current (cancellable);
544
545 g_input_stream_clear_pending (stream);
546
547 stream->priv->closed = TRUE;
548
549 return res;
550 }
551
552 static void
async_ready_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)553 async_ready_callback_wrapper (GObject *source_object,
554 GAsyncResult *res,
555 gpointer user_data)
556 {
557 GInputStream *stream = G_INPUT_STREAM (source_object);
558
559 g_input_stream_clear_pending (stream);
560 if (stream->priv->outstanding_callback)
561 (*stream->priv->outstanding_callback) (source_object, res, user_data);
562 g_object_unref (stream);
563 }
564
565 static void
async_ready_close_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)566 async_ready_close_callback_wrapper (GObject *source_object,
567 GAsyncResult *res,
568 gpointer user_data)
569 {
570 GInputStream *stream = G_INPUT_STREAM (source_object);
571
572 g_input_stream_clear_pending (stream);
573 stream->priv->closed = TRUE;
574 if (stream->priv->outstanding_callback)
575 (*stream->priv->outstanding_callback) (source_object, res, user_data);
576 g_object_unref (stream);
577 }
578
579 /**
580 * g_input_stream_read_async:
581 * @stream: A #GInputStream.
582 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
583 * a buffer to read data into (which should be at least count bytes long).
584 * @count: the number of bytes that will be read from the stream
585 * @io_priority: the [I/O priority][io-priority]
586 * of the request.
587 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
588 * @callback: (scope async): callback to call when the request is satisfied
589 * @user_data: (closure): the data to pass to callback function
590 *
591 * Request an asynchronous read of @count bytes from the stream into the buffer
592 * starting at @buffer. When the operation is finished @callback will be called.
593 * You can then call g_input_stream_read_finish() to get the result of the
594 * operation.
595 *
596 * During an async request no other sync and async calls are allowed on @stream, and will
597 * result in %G_IO_ERROR_PENDING errors.
598 *
599 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
600 *
601 * On success, the number of bytes read into the buffer will be passed to the
602 * callback. It is not an error if this is not the same as the requested size, as it
603 * can happen e.g. near the end of a file, but generally we try to read
604 * as many bytes as requested. Zero is returned on end of file
605 * (or if @count is zero), but never otherwise.
606 *
607 * Any outstanding i/o request with higher priority (lower numerical value) will
608 * be executed before an outstanding request with lower priority. Default
609 * priority is %G_PRIORITY_DEFAULT.
610 *
611 * The asynchronous methods have a default fallback that uses threads to implement
612 * asynchronicity, so they are optional for inheriting classes. However, if you
613 * override one you must override all.
614 **/
615 void
g_input_stream_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)616 g_input_stream_read_async (GInputStream *stream,
617 void *buffer,
618 gsize count,
619 int io_priority,
620 GCancellable *cancellable,
621 GAsyncReadyCallback callback,
622 gpointer user_data)
623 {
624 GInputStreamClass *class;
625 GError *error = NULL;
626
627 g_return_if_fail (G_IS_INPUT_STREAM (stream));
628 g_return_if_fail (buffer != NULL);
629
630 if (count == 0)
631 {
632 GTask *task;
633
634 task = g_task_new (stream, cancellable, callback, user_data);
635 g_task_set_source_tag (task, g_input_stream_read_async);
636 g_task_return_int (task, 0);
637 g_object_unref (task);
638 return;
639 }
640
641 if (((gssize) count) < 0)
642 {
643 g_task_report_new_error (stream, callback, user_data,
644 g_input_stream_read_async,
645 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
646 _("Too large count value passed to %s"),
647 G_STRFUNC);
648 return;
649 }
650
651 if (!g_input_stream_set_pending (stream, &error))
652 {
653 g_task_report_error (stream, callback, user_data,
654 g_input_stream_read_async,
655 error);
656 return;
657 }
658
659 class = G_INPUT_STREAM_GET_CLASS (stream);
660 stream->priv->outstanding_callback = callback;
661 g_object_ref (stream);
662 class->read_async (stream, buffer, count, io_priority, cancellable,
663 async_ready_callback_wrapper, user_data);
664 }
665
666 /**
667 * g_input_stream_read_finish:
668 * @stream: a #GInputStream.
669 * @result: a #GAsyncResult.
670 * @error: a #GError location to store the error occurring, or %NULL to
671 * ignore.
672 *
673 * Finishes an asynchronous stream read operation.
674 *
675 * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
676 **/
677 gssize
g_input_stream_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)678 g_input_stream_read_finish (GInputStream *stream,
679 GAsyncResult *result,
680 GError **error)
681 {
682 GInputStreamClass *class;
683
684 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
685 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
686
687 if (g_async_result_legacy_propagate_error (result, error))
688 return -1;
689 else if (g_async_result_is_tagged (result, g_input_stream_read_async))
690 return g_task_propagate_int (G_TASK (result), error);
691
692 class = G_INPUT_STREAM_GET_CLASS (stream);
693 return class->read_finish (stream, result, error);
694 }
695
696 typedef struct
697 {
698 gchar *buffer;
699 gsize to_read;
700 gsize bytes_read;
701 } AsyncReadAll;
702
703 static void
free_async_read_all(gpointer data)704 free_async_read_all (gpointer data)
705 {
706 g_slice_free (AsyncReadAll, data);
707 }
708
709 static void
read_all_callback(GObject * stream,GAsyncResult * result,gpointer user_data)710 read_all_callback (GObject *stream,
711 GAsyncResult *result,
712 gpointer user_data)
713 {
714 GTask *task = user_data;
715 AsyncReadAll *data = g_task_get_task_data (task);
716 gboolean got_eof = FALSE;
717
718 if (result)
719 {
720 GError *error = NULL;
721 gssize nread;
722
723 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
724
725 if (nread == -1)
726 {
727 g_task_return_error (task, error);
728 g_object_unref (task);
729 return;
730 }
731
732 g_assert_cmpint (nread, <=, data->to_read);
733 data->to_read -= nread;
734 data->bytes_read += nread;
735 got_eof = (nread == 0);
736 }
737
738 if (got_eof || data->to_read == 0)
739 {
740 g_task_return_boolean (task, TRUE);
741 g_object_unref (task);
742 }
743
744 else
745 g_input_stream_read_async (G_INPUT_STREAM (stream),
746 data->buffer + data->bytes_read,
747 data->to_read,
748 g_task_get_priority (task),
749 g_task_get_cancellable (task),
750 read_all_callback, task);
751 }
752
753
754 static void
read_all_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)755 read_all_async_thread (GTask *task,
756 gpointer source_object,
757 gpointer task_data,
758 GCancellable *cancellable)
759 {
760 GInputStream *stream = source_object;
761 AsyncReadAll *data = task_data;
762 GError *error = NULL;
763
764 if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
765 g_task_get_cancellable (task), &error))
766 g_task_return_boolean (task, TRUE);
767 else
768 g_task_return_error (task, error);
769 }
770
771 /**
772 * g_input_stream_read_all_async:
773 * @stream: A #GInputStream
774 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
775 * a buffer to read data into (which should be at least count bytes long)
776 * @count: the number of bytes that will be read from the stream
777 * @io_priority: the [I/O priority][io-priority] of the request
778 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
779 * @callback: (scope async): callback to call when the request is satisfied
780 * @user_data: (closure): the data to pass to callback function
781 *
782 * Request an asynchronous read of @count bytes from the stream into the
783 * buffer starting at @buffer.
784 *
785 * This is the asynchronous equivalent of g_input_stream_read_all().
786 *
787 * Call g_input_stream_read_all_finish() to collect the result.
788 *
789 * Any outstanding I/O request with higher priority (lower numerical
790 * value) will be executed before an outstanding request with lower
791 * priority. Default priority is %G_PRIORITY_DEFAULT.
792 *
793 * Since: 2.44
794 **/
795 void
g_input_stream_read_all_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)796 g_input_stream_read_all_async (GInputStream *stream,
797 void *buffer,
798 gsize count,
799 int io_priority,
800 GCancellable *cancellable,
801 GAsyncReadyCallback callback,
802 gpointer user_data)
803 {
804 AsyncReadAll *data;
805 GTask *task;
806
807 g_return_if_fail (G_IS_INPUT_STREAM (stream));
808 g_return_if_fail (buffer != NULL || count == 0);
809
810 task = g_task_new (stream, cancellable, callback, user_data);
811 data = g_slice_new0 (AsyncReadAll);
812 data->buffer = buffer;
813 data->to_read = count;
814
815 g_task_set_source_tag (task, g_input_stream_read_all_async);
816 g_task_set_task_data (task, data, free_async_read_all);
817 g_task_set_priority (task, io_priority);
818
819 /* If async reads are going to be handled via the threadpool anyway
820 * then we may as well do it with a single dispatch instead of
821 * bouncing in and out.
822 */
823 if (g_input_stream_async_read_is_via_threads (stream))
824 {
825 g_task_run_in_thread (task, read_all_async_thread);
826 g_object_unref (task);
827 }
828 else
829 read_all_callback (G_OBJECT (stream), NULL, task);
830 }
831
832 /**
833 * g_input_stream_read_all_finish:
834 * @stream: a #GInputStream
835 * @result: a #GAsyncResult
836 * @bytes_read: (out): location to store the number of bytes that was read from the stream
837 * @error: a #GError location to store the error occurring, or %NULL to ignore
838 *
839 * Finishes an asynchronous stream read operation started with
840 * g_input_stream_read_all_async().
841 *
842 * As a special exception to the normal conventions for functions that
843 * use #GError, if this function returns %FALSE (and sets @error) then
844 * @bytes_read will be set to the number of bytes that were successfully
845 * read before the error was encountered. This functionality is only
846 * available from C. If you need it from another language then you must
847 * write your own loop around g_input_stream_read_async().
848 *
849 * Returns: %TRUE on success, %FALSE if there was an error
850 *
851 * Since: 2.44
852 **/
853 gboolean
g_input_stream_read_all_finish(GInputStream * stream,GAsyncResult * result,gsize * bytes_read,GError ** error)854 g_input_stream_read_all_finish (GInputStream *stream,
855 GAsyncResult *result,
856 gsize *bytes_read,
857 GError **error)
858 {
859 GTask *task;
860
861 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
862 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
863
864 task = G_TASK (result);
865
866 if (bytes_read)
867 {
868 AsyncReadAll *data = g_task_get_task_data (task);
869
870 *bytes_read = data->bytes_read;
871 }
872
873 return g_task_propagate_boolean (task, error);
874 }
875
876 static void
read_bytes_callback(GObject * stream,GAsyncResult * result,gpointer user_data)877 read_bytes_callback (GObject *stream,
878 GAsyncResult *result,
879 gpointer user_data)
880 {
881 GTask *task = user_data;
882 guchar *buf = g_task_get_task_data (task);
883 GError *error = NULL;
884 gssize nread;
885 GBytes *bytes = NULL;
886
887 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
888 result, &error);
889 if (nread == -1)
890 {
891 g_free (buf);
892 g_task_return_error (task, error);
893 }
894 else if (nread == 0)
895 {
896 g_free (buf);
897 bytes = g_bytes_new_static ("", 0);
898 }
899 else
900 bytes = g_bytes_new_take (buf, nread);
901
902 if (bytes)
903 g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
904
905 g_object_unref (task);
906 }
907
908 /**
909 * g_input_stream_read_bytes_async:
910 * @stream: A #GInputStream.
911 * @count: the number of bytes that will be read from the stream
912 * @io_priority: the [I/O priority][io-priority] of the request
913 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
914 * @callback: (scope async): callback to call when the request is satisfied
915 * @user_data: (closure): the data to pass to callback function
916 *
917 * Request an asynchronous read of @count bytes from the stream into a
918 * new #GBytes. When the operation is finished @callback will be
919 * called. You can then call g_input_stream_read_bytes_finish() to get the
920 * result of the operation.
921 *
922 * During an async request no other sync and async calls are allowed
923 * on @stream, and will result in %G_IO_ERROR_PENDING errors.
924 *
925 * A value of @count larger than %G_MAXSSIZE will cause a
926 * %G_IO_ERROR_INVALID_ARGUMENT error.
927 *
928 * On success, the new #GBytes will be passed to the callback. It is
929 * not an error if this is smaller than the requested size, as it can
930 * happen e.g. near the end of a file, but generally we try to read as
931 * many bytes as requested. Zero is returned on end of file (or if
932 * @count is zero), but never otherwise.
933 *
934 * Any outstanding I/O request with higher priority (lower numerical
935 * value) will be executed before an outstanding request with lower
936 * priority. Default priority is %G_PRIORITY_DEFAULT.
937 *
938 * Since: 2.34
939 **/
940 void
g_input_stream_read_bytes_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)941 g_input_stream_read_bytes_async (GInputStream *stream,
942 gsize count,
943 int io_priority,
944 GCancellable *cancellable,
945 GAsyncReadyCallback callback,
946 gpointer user_data)
947 {
948 GTask *task;
949 guchar *buf;
950
951 task = g_task_new (stream, cancellable, callback, user_data);
952 g_task_set_source_tag (task, g_input_stream_read_bytes_async);
953
954 buf = g_malloc (count);
955 g_task_set_task_data (task, buf, NULL);
956
957 g_input_stream_read_async (stream, buf, count,
958 io_priority, cancellable,
959 read_bytes_callback, task);
960 }
961
962 /**
963 * g_input_stream_read_bytes_finish:
964 * @stream: a #GInputStream.
965 * @result: a #GAsyncResult.
966 * @error: a #GError location to store the error occurring, or %NULL to
967 * ignore.
968 *
969 * Finishes an asynchronous stream read-into-#GBytes operation.
970 *
971 * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
972 *
973 * Since: 2.34
974 **/
975 GBytes *
g_input_stream_read_bytes_finish(GInputStream * stream,GAsyncResult * result,GError ** error)976 g_input_stream_read_bytes_finish (GInputStream *stream,
977 GAsyncResult *result,
978 GError **error)
979 {
980 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
981 g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
982
983 return g_task_propagate_pointer (G_TASK (result), error);
984 }
985
986 /**
987 * g_input_stream_skip_async:
988 * @stream: A #GInputStream.
989 * @count: the number of bytes that will be skipped from the stream
990 * @io_priority: the [I/O priority][io-priority] of the request
991 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
992 * @callback: (scope async): callback to call when the request is satisfied
993 * @user_data: (closure): the data to pass to callback function
994 *
995 * Request an asynchronous skip of @count bytes from the stream.
996 * When the operation is finished @callback will be called.
997 * You can then call g_input_stream_skip_finish() to get the result
998 * of the operation.
999 *
1000 * During an async request no other sync and async calls are allowed,
1001 * and will result in %G_IO_ERROR_PENDING errors.
1002 *
1003 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1004 *
1005 * On success, the number of bytes skipped will be passed to the callback.
1006 * It is not an error if this is not the same as the requested size, as it
1007 * can happen e.g. near the end of a file, but generally we try to skip
1008 * as many bytes as requested. Zero is returned on end of file
1009 * (or if @count is zero), but never otherwise.
1010 *
1011 * Any outstanding i/o request with higher priority (lower numerical value)
1012 * will be executed before an outstanding request with lower priority.
1013 * Default priority is %G_PRIORITY_DEFAULT.
1014 *
1015 * The asynchronous methods have a default fallback that uses threads to
1016 * implement asynchronicity, so they are optional for inheriting classes.
1017 * However, if you override one, you must override all.
1018 **/
1019 void
g_input_stream_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1020 g_input_stream_skip_async (GInputStream *stream,
1021 gsize count,
1022 int io_priority,
1023 GCancellable *cancellable,
1024 GAsyncReadyCallback callback,
1025 gpointer user_data)
1026 {
1027 GInputStreamClass *class;
1028 GError *error = NULL;
1029
1030 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1031
1032 if (count == 0)
1033 {
1034 GTask *task;
1035
1036 task = g_task_new (stream, cancellable, callback, user_data);
1037 g_task_set_source_tag (task, g_input_stream_skip_async);
1038 g_task_return_int (task, 0);
1039 g_object_unref (task);
1040 return;
1041 }
1042
1043 if (((gssize) count) < 0)
1044 {
1045 g_task_report_new_error (stream, callback, user_data,
1046 g_input_stream_skip_async,
1047 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1048 _("Too large count value passed to %s"),
1049 G_STRFUNC);
1050 return;
1051 }
1052
1053 if (!g_input_stream_set_pending (stream, &error))
1054 {
1055 g_task_report_error (stream, callback, user_data,
1056 g_input_stream_skip_async,
1057 error);
1058 return;
1059 }
1060
1061 class = G_INPUT_STREAM_GET_CLASS (stream);
1062 stream->priv->outstanding_callback = callback;
1063 g_object_ref (stream);
1064 class->skip_async (stream, count, io_priority, cancellable,
1065 async_ready_callback_wrapper, user_data);
1066 }
1067
1068 /**
1069 * g_input_stream_skip_finish:
1070 * @stream: a #GInputStream.
1071 * @result: a #GAsyncResult.
1072 * @error: a #GError location to store the error occurring, or %NULL to
1073 * ignore.
1074 *
1075 * Finishes a stream skip operation.
1076 *
1077 * Returns: the size of the bytes skipped, or `-1` on error.
1078 **/
1079 gssize
g_input_stream_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1080 g_input_stream_skip_finish (GInputStream *stream,
1081 GAsyncResult *result,
1082 GError **error)
1083 {
1084 GInputStreamClass *class;
1085
1086 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1087 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1088
1089 if (g_async_result_legacy_propagate_error (result, error))
1090 return -1;
1091 else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1092 return g_task_propagate_int (G_TASK (result), error);
1093
1094 class = G_INPUT_STREAM_GET_CLASS (stream);
1095 return class->skip_finish (stream, result, error);
1096 }
1097
1098 /**
1099 * g_input_stream_close_async:
1100 * @stream: A #GInputStream.
1101 * @io_priority: the [I/O priority][io-priority] of the request
1102 * @cancellable: (nullable): optional cancellable object
1103 * @callback: (scope async): callback to call when the request is satisfied
1104 * @user_data: (closure): the data to pass to callback function
1105 *
1106 * Requests an asynchronous closes of the stream, releasing resources related to it.
1107 * When the operation is finished @callback will be called.
1108 * You can then call g_input_stream_close_finish() to get the result of the
1109 * operation.
1110 *
1111 * For behaviour details see g_input_stream_close().
1112 *
1113 * The asynchronous methods have a default fallback that uses threads to implement
1114 * asynchronicity, so they are optional for inheriting classes. However, if you
1115 * override one you must override all.
1116 **/
1117 void
g_input_stream_close_async(GInputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1118 g_input_stream_close_async (GInputStream *stream,
1119 int io_priority,
1120 GCancellable *cancellable,
1121 GAsyncReadyCallback callback,
1122 gpointer user_data)
1123 {
1124 GInputStreamClass *class;
1125 GError *error = NULL;
1126
1127 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1128
1129 if (stream->priv->closed)
1130 {
1131 GTask *task;
1132
1133 task = g_task_new (stream, cancellable, callback, user_data);
1134 g_task_set_source_tag (task, g_input_stream_close_async);
1135 g_task_return_boolean (task, TRUE);
1136 g_object_unref (task);
1137 return;
1138 }
1139
1140 if (!g_input_stream_set_pending (stream, &error))
1141 {
1142 g_task_report_error (stream, callback, user_data,
1143 g_input_stream_close_async,
1144 error);
1145 return;
1146 }
1147
1148 class = G_INPUT_STREAM_GET_CLASS (stream);
1149 stream->priv->outstanding_callback = callback;
1150 g_object_ref (stream);
1151 class->close_async (stream, io_priority, cancellable,
1152 async_ready_close_callback_wrapper, user_data);
1153 }
1154
1155 /**
1156 * g_input_stream_close_finish:
1157 * @stream: a #GInputStream.
1158 * @result: a #GAsyncResult.
1159 * @error: a #GError location to store the error occurring, or %NULL to
1160 * ignore.
1161 *
1162 * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1163 *
1164 * Returns: %TRUE if the stream was closed successfully.
1165 **/
1166 gboolean
g_input_stream_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1167 g_input_stream_close_finish (GInputStream *stream,
1168 GAsyncResult *result,
1169 GError **error)
1170 {
1171 GInputStreamClass *class;
1172
1173 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1174 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1175
1176 if (g_async_result_legacy_propagate_error (result, error))
1177 return FALSE;
1178 else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1179 return g_task_propagate_boolean (G_TASK (result), error);
1180
1181 class = G_INPUT_STREAM_GET_CLASS (stream);
1182 return class->close_finish (stream, result, error);
1183 }
1184
1185 /**
1186 * g_input_stream_is_closed:
1187 * @stream: input stream.
1188 *
1189 * Checks if an input stream is closed.
1190 *
1191 * Returns: %TRUE if the stream is closed.
1192 **/
1193 gboolean
g_input_stream_is_closed(GInputStream * stream)1194 g_input_stream_is_closed (GInputStream *stream)
1195 {
1196 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1197
1198 return stream->priv->closed;
1199 }
1200
1201 /**
1202 * g_input_stream_has_pending:
1203 * @stream: input stream.
1204 *
1205 * Checks if an input stream has pending actions.
1206 *
1207 * Returns: %TRUE if @stream has pending actions.
1208 **/
1209 gboolean
g_input_stream_has_pending(GInputStream * stream)1210 g_input_stream_has_pending (GInputStream *stream)
1211 {
1212 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1213
1214 return stream->priv->pending;
1215 }
1216
1217 /**
1218 * g_input_stream_set_pending:
1219 * @stream: input stream
1220 * @error: a #GError location to store the error occurring, or %NULL to
1221 * ignore.
1222 *
1223 * Sets @stream to have actions pending. If the pending flag is
1224 * already set or @stream is closed, it will return %FALSE and set
1225 * @error.
1226 *
1227 * Returns: %TRUE if pending was previously unset and is now set.
1228 **/
1229 gboolean
g_input_stream_set_pending(GInputStream * stream,GError ** error)1230 g_input_stream_set_pending (GInputStream *stream, GError **error)
1231 {
1232 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1233
1234 if (stream->priv->closed)
1235 {
1236 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1237 _("Stream is already closed"));
1238 return FALSE;
1239 }
1240
1241 if (stream->priv->pending)
1242 {
1243 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1244 /* Translators: This is an error you get if there is already an
1245 * operation running against this stream when you try to start
1246 * one */
1247 _("Stream has outstanding operation"));
1248 return FALSE;
1249 }
1250
1251 stream->priv->pending = TRUE;
1252 return TRUE;
1253 }
1254
1255 /**
1256 * g_input_stream_clear_pending:
1257 * @stream: input stream
1258 *
1259 * Clears the pending flag on @stream.
1260 **/
1261 void
g_input_stream_clear_pending(GInputStream * stream)1262 g_input_stream_clear_pending (GInputStream *stream)
1263 {
1264 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1265
1266 stream->priv->pending = FALSE;
1267 }
1268
1269 /*< internal >
1270 * g_input_stream_async_read_is_via_threads:
1271 * @stream: input stream
1272 *
1273 * Checks if an input stream's read_async function uses threads.
1274 *
1275 * Returns: %TRUE if @stream's read_async function uses threads.
1276 **/
1277 gboolean
g_input_stream_async_read_is_via_threads(GInputStream * stream)1278 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1279 {
1280 GInputStreamClass *class;
1281
1282 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1283
1284 class = G_INPUT_STREAM_GET_CLASS (stream);
1285
1286 return (class->read_async == g_input_stream_real_read_async &&
1287 !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1288 g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1289 }
1290
1291 /*< internal >
1292 * g_input_stream_async_close_is_via_threads:
1293 * @stream: input stream
1294 *
1295 * Checks if an input stream's close_async function uses threads.
1296 *
1297 * Returns: %TRUE if @stream's close_async function uses threads.
1298 **/
1299 gboolean
g_input_stream_async_close_is_via_threads(GInputStream * stream)1300 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1301 {
1302 GInputStreamClass *class;
1303
1304 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1305
1306 class = G_INPUT_STREAM_GET_CLASS (stream);
1307
1308 return class->close_async == g_input_stream_real_close_async;
1309 }
1310
1311 /********************************************
1312 * Default implementation of async ops *
1313 ********************************************/
1314
1315 typedef struct {
1316 void *buffer;
1317 gsize count;
1318 } ReadData;
1319
1320 static void
free_read_data(ReadData * op)1321 free_read_data (ReadData *op)
1322 {
1323 g_slice_free (ReadData, op);
1324 }
1325
1326 static void
read_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1327 read_async_thread (GTask *task,
1328 gpointer source_object,
1329 gpointer task_data,
1330 GCancellable *cancellable)
1331 {
1332 GInputStream *stream = source_object;
1333 ReadData *op = task_data;
1334 GInputStreamClass *class;
1335 GError *error = NULL;
1336 gssize nread;
1337
1338 class = G_INPUT_STREAM_GET_CLASS (stream);
1339
1340 nread = class->read_fn (stream,
1341 op->buffer, op->count,
1342 g_task_get_cancellable (task),
1343 &error);
1344 if (nread == -1)
1345 g_task_return_error (task, error);
1346 else
1347 g_task_return_int (task, nread);
1348 }
1349
1350 static void read_async_pollable (GPollableInputStream *stream,
1351 GTask *task);
1352
1353 static gboolean
read_async_pollable_ready(GPollableInputStream * stream,gpointer user_data)1354 read_async_pollable_ready (GPollableInputStream *stream,
1355 gpointer user_data)
1356 {
1357 GTask *task = user_data;
1358
1359 read_async_pollable (stream, task);
1360 return FALSE;
1361 }
1362
1363 static void
read_async_pollable(GPollableInputStream * stream,GTask * task)1364 read_async_pollable (GPollableInputStream *stream,
1365 GTask *task)
1366 {
1367 ReadData *op = g_task_get_task_data (task);
1368 GError *error = NULL;
1369 gssize nread;
1370
1371 if (g_task_return_error_if_cancelled (task))
1372 return;
1373
1374 nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1375 read_nonblocking (stream, op->buffer, op->count, &error);
1376
1377 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1378 {
1379 GSource *source;
1380
1381 g_error_free (error);
1382
1383 source = g_pollable_input_stream_create_source (stream,
1384 g_task_get_cancellable (task));
1385 g_task_attach_source (task, source,
1386 (GSourceFunc) read_async_pollable_ready);
1387 g_source_unref (source);
1388 return;
1389 }
1390
1391 if (nread == -1)
1392 g_task_return_error (task, error);
1393 else
1394 g_task_return_int (task, nread);
1395 /* g_input_stream_real_read_async() unrefs task */
1396 }
1397
1398
1399 static void
g_input_stream_real_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1400 g_input_stream_real_read_async (GInputStream *stream,
1401 void *buffer,
1402 gsize count,
1403 int io_priority,
1404 GCancellable *cancellable,
1405 GAsyncReadyCallback callback,
1406 gpointer user_data)
1407 {
1408 GTask *task;
1409 ReadData *op;
1410
1411 op = g_slice_new0 (ReadData);
1412 task = g_task_new (stream, cancellable, callback, user_data);
1413 g_task_set_source_tag (task, g_input_stream_real_read_async);
1414 g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1415 g_task_set_priority (task, io_priority);
1416 op->buffer = buffer;
1417 op->count = count;
1418
1419 if (!g_input_stream_async_read_is_via_threads (stream))
1420 read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1421 else
1422 g_task_run_in_thread (task, read_async_thread);
1423 g_object_unref (task);
1424 }
1425
1426 static gssize
g_input_stream_real_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1427 g_input_stream_real_read_finish (GInputStream *stream,
1428 GAsyncResult *result,
1429 GError **error)
1430 {
1431 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1432
1433 return g_task_propagate_int (G_TASK (result), error);
1434 }
1435
1436
1437 static void
skip_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1438 skip_async_thread (GTask *task,
1439 gpointer source_object,
1440 gpointer task_data,
1441 GCancellable *cancellable)
1442 {
1443 GInputStream *stream = source_object;
1444 gsize count = GPOINTER_TO_SIZE (task_data);
1445 GInputStreamClass *class;
1446 GError *error = NULL;
1447 gssize ret;
1448
1449 class = G_INPUT_STREAM_GET_CLASS (stream);
1450 ret = class->skip (stream, count,
1451 g_task_get_cancellable (task),
1452 &error);
1453 if (ret == -1)
1454 g_task_return_error (task, error);
1455 else
1456 g_task_return_int (task, ret);
1457 }
1458
1459 typedef struct {
1460 char buffer[8192];
1461 gsize count;
1462 gsize count_skipped;
1463 } SkipFallbackAsyncData;
1464
1465 static void
skip_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1466 skip_callback_wrapper (GObject *source_object,
1467 GAsyncResult *res,
1468 gpointer user_data)
1469 {
1470 GInputStreamClass *class;
1471 GTask *task = user_data;
1472 SkipFallbackAsyncData *data = g_task_get_task_data (task);
1473 GError *error = NULL;
1474 gssize ret;
1475
1476 ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1477
1478 if (ret > 0)
1479 {
1480 data->count -= ret;
1481 data->count_skipped += ret;
1482
1483 if (data->count > 0)
1484 {
1485 class = G_INPUT_STREAM_GET_CLASS (source_object);
1486 class->read_async (G_INPUT_STREAM (source_object),
1487 data->buffer, MIN (8192, data->count),
1488 g_task_get_priority (task),
1489 g_task_get_cancellable (task),
1490 skip_callback_wrapper, task);
1491 return;
1492 }
1493 }
1494
1495 if (ret == -1 &&
1496 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1497 data->count_skipped)
1498 {
1499 /* No error, return partial read */
1500 g_clear_error (&error);
1501 }
1502
1503 if (error)
1504 g_task_return_error (task, error);
1505 else
1506 g_task_return_int (task, data->count_skipped);
1507 g_object_unref (task);
1508 }
1509
1510 static void
g_input_stream_real_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1511 g_input_stream_real_skip_async (GInputStream *stream,
1512 gsize count,
1513 int io_priority,
1514 GCancellable *cancellable,
1515 GAsyncReadyCallback callback,
1516 gpointer user_data)
1517 {
1518 GInputStreamClass *class;
1519 SkipFallbackAsyncData *data;
1520 GTask *task;
1521
1522 class = G_INPUT_STREAM_GET_CLASS (stream);
1523
1524 task = g_task_new (stream, cancellable, callback, user_data);
1525 g_task_set_source_tag (task, g_input_stream_real_skip_async);
1526 g_task_set_priority (task, io_priority);
1527
1528 if (g_input_stream_async_read_is_via_threads (stream))
1529 {
1530 /* Read is thread-using async fallback.
1531 * Make skip use threads too, so that we can use a possible sync skip
1532 * implementation. */
1533 g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1534
1535 g_task_run_in_thread (task, skip_async_thread);
1536 g_object_unref (task);
1537 }
1538 else
1539 {
1540 /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1541
1542 /* There is a custom async read function, lets use that. */
1543 data = g_new (SkipFallbackAsyncData, 1);
1544 data->count = count;
1545 data->count_skipped = 0;
1546 g_task_set_task_data (task, data, g_free);
1547 g_task_set_check_cancellable (task, FALSE);
1548 class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1549 skip_callback_wrapper, task);
1550 }
1551
1552 }
1553
1554 static gssize
g_input_stream_real_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1555 g_input_stream_real_skip_finish (GInputStream *stream,
1556 GAsyncResult *result,
1557 GError **error)
1558 {
1559 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1560
1561 return g_task_propagate_int (G_TASK (result), error);
1562 }
1563
1564 static void
close_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1565 close_async_thread (GTask *task,
1566 gpointer source_object,
1567 gpointer task_data,
1568 GCancellable *cancellable)
1569 {
1570 GInputStream *stream = source_object;
1571 GInputStreamClass *class;
1572 GError *error = NULL;
1573 gboolean result;
1574
1575 class = G_INPUT_STREAM_GET_CLASS (stream);
1576 if (class->close_fn)
1577 {
1578 result = class->close_fn (stream,
1579 g_task_get_cancellable (task),
1580 &error);
1581 if (!result)
1582 {
1583 g_task_return_error (task, error);
1584 return;
1585 }
1586 }
1587
1588 g_task_return_boolean (task, TRUE);
1589 }
1590
1591 static void
g_input_stream_real_close_async(GInputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1592 g_input_stream_real_close_async (GInputStream *stream,
1593 int io_priority,
1594 GCancellable *cancellable,
1595 GAsyncReadyCallback callback,
1596 gpointer user_data)
1597 {
1598 GTask *task;
1599
1600 task = g_task_new (stream, cancellable, callback, user_data);
1601 g_task_set_source_tag (task, g_input_stream_real_close_async);
1602 g_task_set_check_cancellable (task, FALSE);
1603 g_task_set_priority (task, io_priority);
1604
1605 g_task_run_in_thread (task, close_async_thread);
1606 g_object_unref (task);
1607 }
1608
1609 static gboolean
g_input_stream_real_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1610 g_input_stream_real_close_finish (GInputStream *stream,
1611 GAsyncResult *result,
1612 GError **error)
1613 {
1614 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1615
1616 return g_task_propagate_boolean (G_TASK (result), error);
1617 }
1618