• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20 
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24 
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32 
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()).
41  *
42  * To copy the content of an input stream to an output stream without
43  * manually handling the reads and writes, use g_output_stream_splice().
44  *
45  * See the documentation for #GIOStream for details of thread safety of
46  * streaming APIs.
47  *
48  * All of these functions have async variants too.
49  **/
50 
51 struct _GInputStreamPrivate {
52   guint closed : 1;
53   guint pending : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56 
57 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
58 
59 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
60 						  gsize                 count,
61 						  GCancellable         *cancellable,
62 						  GError              **error);
63 static void     g_input_stream_real_read_async   (GInputStream         *stream,
64 						  void                 *buffer,
65 						  gsize                 count,
66 						  int                   io_priority,
67 						  GCancellable         *cancellable,
68 						  GAsyncReadyCallback   callback,
69 						  gpointer              user_data);
70 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
71 						  GAsyncResult         *result,
72 						  GError              **error);
73 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
74 						  gsize                 count,
75 						  int                   io_priority,
76 						  GCancellable         *cancellable,
77 						  GAsyncReadyCallback   callback,
78 						  gpointer              data);
79 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
80 						  GAsyncResult         *result,
81 						  GError              **error);
82 static void     g_input_stream_real_close_async  (GInputStream         *stream,
83 						  int                   io_priority,
84 						  GCancellable         *cancellable,
85 						  GAsyncReadyCallback   callback,
86 						  gpointer              data);
87 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
88 						  GAsyncResult         *result,
89 						  GError              **error);
90 
91 static void
g_input_stream_dispose(GObject * object)92 g_input_stream_dispose (GObject *object)
93 {
94   GInputStream *stream;
95 
96   stream = G_INPUT_STREAM (object);
97 
98   if (!stream->priv->closed)
99     g_input_stream_close (stream, NULL, NULL);
100 
101   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
102 }
103 
104 
105 static void
g_input_stream_class_init(GInputStreamClass * klass)106 g_input_stream_class_init (GInputStreamClass *klass)
107 {
108   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109 
110   gobject_class->dispose = g_input_stream_dispose;
111 
112   klass->skip = g_input_stream_real_skip;
113   klass->read_async = g_input_stream_real_read_async;
114   klass->read_finish = g_input_stream_real_read_finish;
115   klass->skip_async = g_input_stream_real_skip_async;
116   klass->skip_finish = g_input_stream_real_skip_finish;
117   klass->close_async = g_input_stream_real_close_async;
118   klass->close_finish = g_input_stream_real_close_finish;
119 }
120 
121 static void
g_input_stream_init(GInputStream * stream)122 g_input_stream_init (GInputStream *stream)
123 {
124   stream->priv = g_input_stream_get_instance_private (stream);
125 }
126 
127 /**
128  * g_input_stream_read:
129  * @stream: a #GInputStream.
130  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
131  *     a buffer to read data into (which should be at least count bytes long).
132  * @count: the number of bytes that will be read from the stream
133  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
134  * @error: location to store the error occurring, or %NULL to ignore
135  *
136  * Tries to read @count bytes from the stream into the buffer starting at
137  * @buffer. Will block during this read.
138  *
139  * If count is zero returns zero and does nothing. A value of @count
140  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
141  *
142  * On success, the number of bytes read into the buffer is returned.
143  * It is not an error if this is not the same as the requested size, as it
144  * can happen e.g. near the end of a file. Zero is returned on end of file
145  * (or if @count is zero),  but never otherwise.
146  *
147  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
148  * at any position, and this function doesn't nul-terminate the @buffer.
149  *
150  * If @cancellable is not %NULL, then the operation can be cancelled by
151  * triggering the cancellable object from another thread. If the operation
152  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
153  * operation was partially finished when the operation was cancelled the
154  * partial result will be returned, without an error.
155  *
156  * On error -1 is returned and @error is set accordingly.
157  *
158  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
159  **/
160 gssize
g_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)161 g_input_stream_read  (GInputStream  *stream,
162 		      void          *buffer,
163 		      gsize          count,
164 		      GCancellable  *cancellable,
165 		      GError       **error)
166 {
167   GInputStreamClass *class;
168   gssize res;
169 
170   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
171   g_return_val_if_fail (buffer != NULL, 0);
172 
173   if (count == 0)
174     return 0;
175 
176   if (((gssize) count) < 0)
177     {
178       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
179 		   _("Too large count value passed to %s"), G_STRFUNC);
180       return -1;
181     }
182 
183   class = G_INPUT_STREAM_GET_CLASS (stream);
184 
185   if (class->read_fn == NULL)
186     {
187       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
188                            _("Input stream doesn’t implement read"));
189       return -1;
190     }
191 
192   if (!g_input_stream_set_pending (stream, error))
193     return -1;
194 
195   if (cancellable)
196     g_cancellable_push_current (cancellable);
197 
198   res = class->read_fn (stream, buffer, count, cancellable, error);
199 
200   if (cancellable)
201     g_cancellable_pop_current (cancellable);
202 
203   g_input_stream_clear_pending (stream);
204 
205   return res;
206 }
207 
208 /**
209  * g_input_stream_read_all:
210  * @stream: a #GInputStream.
211  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
212  *     a buffer to read data into (which should be at least count bytes long).
213  * @count: the number of bytes that will be read from the stream
214  * @bytes_read: (out): location to store the number of bytes that was read from the stream
215  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
216  * @error: location to store the error occurring, or %NULL to ignore
217  *
218  * Tries to read @count bytes from the stream into the buffer starting at
219  * @buffer. Will block during this read.
220  *
221  * This function is similar to g_input_stream_read(), except it tries to
222  * read as many bytes as requested, only stopping on an error or end of stream.
223  *
224  * On a successful read of @count bytes, or if we reached the end of the
225  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
226  * read into @buffer.
227  *
228  * If there is an error during the operation %FALSE is returned and @error
229  * is set to indicate the error status.
230  *
231  * As a special exception to the normal conventions for functions that
232  * use #GError, if this function returns %FALSE (and sets @error) then
233  * @bytes_read will be set to the number of bytes that were successfully
234  * read before the error was encountered.  This functionality is only
235  * available from C.  If you need it from another language then you must
236  * write your own loop around g_input_stream_read().
237  *
238  * Returns: %TRUE on success, %FALSE if there was an error
239  **/
240 gboolean
g_input_stream_read_all(GInputStream * stream,void * buffer,gsize count,gsize * bytes_read,GCancellable * cancellable,GError ** error)241 g_input_stream_read_all (GInputStream  *stream,
242 			 void          *buffer,
243 			 gsize          count,
244 			 gsize         *bytes_read,
245 			 GCancellable  *cancellable,
246 			 GError       **error)
247 {
248   gsize _bytes_read;
249   gssize res;
250 
251   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252   g_return_val_if_fail (buffer != NULL, FALSE);
253 
254   _bytes_read = 0;
255   while (_bytes_read < count)
256     {
257       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258 				 cancellable, error);
259       if (res == -1)
260 	{
261 	  if (bytes_read)
262 	    *bytes_read = _bytes_read;
263 	  return FALSE;
264 	}
265 
266       if (res == 0)
267 	break;
268 
269       _bytes_read += res;
270     }
271 
272   if (bytes_read)
273     *bytes_read = _bytes_read;
274   return TRUE;
275 }
276 
277 /**
278  * g_input_stream_read_bytes:
279  * @stream: a #GInputStream.
280  * @count: maximum number of bytes that will be read from the stream. Common
281  * values include 4096 and 8192.
282  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
283  * @error: location to store the error occurring, or %NULL to ignore
284  *
285  * Like g_input_stream_read(), this tries to read @count bytes from
286  * the stream in a blocking fashion. However, rather than reading into
287  * a user-supplied buffer, this will create a new #GBytes containing
288  * the data that was read. This may be easier to use from language
289  * bindings.
290  *
291  * If count is zero, returns a zero-length #GBytes and does nothing. A
292  * value of @count larger than %G_MAXSSIZE will cause a
293  * %G_IO_ERROR_INVALID_ARGUMENT error.
294  *
295  * On success, a new #GBytes is returned. It is not an error if the
296  * size of this object is not the same as the requested size, as it
297  * can happen e.g. near the end of a file. A zero-length #GBytes is
298  * returned on end of file (or if @count is zero), but never
299  * otherwise.
300  *
301  * If @cancellable is not %NULL, then the operation can be cancelled by
302  * triggering the cancellable object from another thread. If the operation
303  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304  * operation was partially finished when the operation was cancelled the
305  * partial result will be returned, without an error.
306  *
307  * On error %NULL is returned and @error is set accordingly.
308  *
309  * Returns: (transfer full): a new #GBytes, or %NULL on error
310  *
311  * Since: 2.34
312  **/
313 GBytes *
g_input_stream_read_bytes(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)314 g_input_stream_read_bytes (GInputStream  *stream,
315 			   gsize          count,
316 			   GCancellable  *cancellable,
317 			   GError       **error)
318 {
319   guchar *buf;
320   gssize nread;
321 
322   buf = g_malloc (count);
323   nread = g_input_stream_read (stream, buf, count, cancellable, error);
324   if (nread == -1)
325     {
326       g_free (buf);
327       return NULL;
328     }
329   else if (nread == 0)
330     {
331       g_free (buf);
332       return g_bytes_new_static ("", 0);
333     }
334   else
335     return g_bytes_new_take (buf, nread);
336 }
337 
338 /**
339  * g_input_stream_skip:
340  * @stream: a #GInputStream.
341  * @count: the number of bytes that will be skipped from the stream
342  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
343  * @error: location to store the error occurring, or %NULL to ignore
344  *
345  * Tries to skip @count bytes from the stream. Will block during the operation.
346  *
347  * This is identical to g_input_stream_read(), from a behaviour standpoint,
348  * but the bytes that are skipped are not returned to the user. Some
349  * streams have an implementation that is more efficient than reading the data.
350  *
351  * This function is optional for inherited classes, as the default implementation
352  * emulates it using read.
353  *
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
357  * operation was partially finished when the operation was cancelled the
358  * partial result will be returned, without an error.
359  *
360  * Returns: Number of bytes skipped, or -1 on error
361  **/
362 gssize
g_input_stream_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)363 g_input_stream_skip (GInputStream  *stream,
364 		     gsize          count,
365 		     GCancellable  *cancellable,
366 		     GError       **error)
367 {
368   GInputStreamClass *class;
369   gssize res;
370 
371   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
372 
373   if (count == 0)
374     return 0;
375 
376   if (((gssize) count) < 0)
377     {
378       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
379 		   _("Too large count value passed to %s"), G_STRFUNC);
380       return -1;
381     }
382 
383   class = G_INPUT_STREAM_GET_CLASS (stream);
384 
385   if (!g_input_stream_set_pending (stream, error))
386     return -1;
387 
388   if (cancellable)
389     g_cancellable_push_current (cancellable);
390 
391   res = class->skip (stream, count, cancellable, error);
392 
393   if (cancellable)
394     g_cancellable_pop_current (cancellable);
395 
396   g_input_stream_clear_pending (stream);
397 
398   return res;
399 }
400 
401 static gssize
g_input_stream_real_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)402 g_input_stream_real_skip (GInputStream  *stream,
403 			  gsize          count,
404 			  GCancellable  *cancellable,
405 			  GError       **error)
406 {
407   GInputStreamClass *class;
408   gssize ret, read_bytes;
409   char buffer[8192];
410   GError *my_error;
411 
412   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
413     {
414       GSeekable *seekable = G_SEEKABLE (stream);
415       goffset start, end;
416       gboolean success;
417 
418       /* g_seekable_seek() may try to set pending itself */
419       stream->priv->pending = FALSE;
420 
421       start = g_seekable_tell (seekable);
422 
423       if (g_seekable_seek (G_SEEKABLE (stream),
424                            0,
425                            G_SEEK_END,
426                            cancellable,
427                            NULL))
428         {
429           end = g_seekable_tell (seekable);
430           g_assert (end >= start);
431           if (start > G_MAXSIZE - count || start + count > end)
432             {
433               stream->priv->pending = TRUE;
434               return end - start;
435             }
436 
437           success = g_seekable_seek (G_SEEKABLE (stream),
438                                      start + count,
439                                      G_SEEK_SET,
440                                      cancellable,
441                                      error);
442           stream->priv->pending = TRUE;
443 
444           if (success)
445             return count;
446           else
447             return -1;
448         }
449     }
450 
451   /* If not seekable, or seek failed, fall back to reading data: */
452 
453   class = G_INPUT_STREAM_GET_CLASS (stream);
454 
455   read_bytes = 0;
456   while (1)
457     {
458       my_error = NULL;
459 
460       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
461                             cancellable, &my_error);
462       if (ret == -1)
463 	{
464 	  if (read_bytes > 0 &&
465 	      my_error->domain == G_IO_ERROR &&
466 	      my_error->code == G_IO_ERROR_CANCELLED)
467 	    {
468 	      g_error_free (my_error);
469 	      return read_bytes;
470 	    }
471 
472 	  g_propagate_error (error, my_error);
473 	  return -1;
474 	}
475 
476       count -= ret;
477       read_bytes += ret;
478 
479       if (ret == 0 || count == 0)
480         return read_bytes;
481     }
482 }
483 
484 /**
485  * g_input_stream_close:
486  * @stream: A #GInputStream.
487  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
488  * @error: location to store the error occurring, or %NULL to ignore
489  *
490  * Closes the stream, releasing resources related to it.
491  *
492  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
493  * Closing a stream multiple times will not return an error.
494  *
495  * Streams will be automatically closed when the last reference
496  * is dropped, but you might want to call this function to make sure
497  * resources are released as early as possible.
498  *
499  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
500  * open after the stream is closed. See the documentation for the individual
501  * stream for details.
502  *
503  * On failure the first error that happened will be reported, but the close
504  * operation will finish as much as possible. A stream that failed to
505  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
506  * is important to check and report the error to the user.
507  *
508  * If @cancellable is not %NULL, then the operation can be cancelled by
509  * triggering the cancellable object from another thread. If the operation
510  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
511  * Cancelling a close will still leave the stream closed, but some streams
512  * can use a faster close that doesn't block to e.g. check errors.
513  *
514  * Returns: %TRUE on success, %FALSE on failure
515  **/
516 gboolean
g_input_stream_close(GInputStream * stream,GCancellable * cancellable,GError ** error)517 g_input_stream_close (GInputStream  *stream,
518 		      GCancellable  *cancellable,
519 		      GError       **error)
520 {
521   GInputStreamClass *class;
522   gboolean res;
523 
524   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
525 
526   class = G_INPUT_STREAM_GET_CLASS (stream);
527 
528   if (stream->priv->closed)
529     return TRUE;
530 
531   res = TRUE;
532 
533   if (!g_input_stream_set_pending (stream, error))
534     return FALSE;
535 
536   if (cancellable)
537     g_cancellable_push_current (cancellable);
538 
539   if (class->close_fn)
540     res = class->close_fn (stream, cancellable, error);
541 
542   if (cancellable)
543     g_cancellable_pop_current (cancellable);
544 
545   g_input_stream_clear_pending (stream);
546 
547   stream->priv->closed = TRUE;
548 
549   return res;
550 }
551 
552 static void
async_ready_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)553 async_ready_callback_wrapper (GObject      *source_object,
554 			      GAsyncResult *res,
555 			      gpointer      user_data)
556 {
557   GInputStream *stream = G_INPUT_STREAM (source_object);
558 
559   g_input_stream_clear_pending (stream);
560   if (stream->priv->outstanding_callback)
561     (*stream->priv->outstanding_callback) (source_object, res, user_data);
562   g_object_unref (stream);
563 }
564 
565 static void
async_ready_close_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)566 async_ready_close_callback_wrapper (GObject      *source_object,
567 				    GAsyncResult *res,
568 				    gpointer      user_data)
569 {
570   GInputStream *stream = G_INPUT_STREAM (source_object);
571 
572   g_input_stream_clear_pending (stream);
573   stream->priv->closed = TRUE;
574   if (stream->priv->outstanding_callback)
575     (*stream->priv->outstanding_callback) (source_object, res, user_data);
576   g_object_unref (stream);
577 }
578 
579 /**
580  * g_input_stream_read_async:
581  * @stream: A #GInputStream.
582  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
583  *     a buffer to read data into (which should be at least count bytes long).
584  * @count: the number of bytes that will be read from the stream
585  * @io_priority: the [I/O priority][io-priority]
586  * of the request.
587  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
588  * @callback: (scope async): callback to call when the request is satisfied
589  * @user_data: (closure): the data to pass to callback function
590  *
591  * Request an asynchronous read of @count bytes from the stream into the buffer
592  * starting at @buffer. When the operation is finished @callback will be called.
593  * You can then call g_input_stream_read_finish() to get the result of the
594  * operation.
595  *
596  * During an async request no other sync and async calls are allowed on @stream, and will
597  * result in %G_IO_ERROR_PENDING errors.
598  *
599  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
600  *
601  * On success, the number of bytes read into the buffer will be passed to the
602  * callback. It is not an error if this is not the same as the requested size, as it
603  * can happen e.g. near the end of a file, but generally we try to read
604  * as many bytes as requested. Zero is returned on end of file
605  * (or if @count is zero),  but never otherwise.
606  *
607  * Any outstanding i/o request with higher priority (lower numerical value) will
608  * be executed before an outstanding request with lower priority. Default
609  * priority is %G_PRIORITY_DEFAULT.
610  *
611  * The asynchronous methods have a default fallback that uses threads to implement
612  * asynchronicity, so they are optional for inheriting classes. However, if you
613  * override one you must override all.
614  **/
615 void
g_input_stream_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)616 g_input_stream_read_async (GInputStream        *stream,
617 			   void                *buffer,
618 			   gsize                count,
619 			   int                  io_priority,
620 			   GCancellable        *cancellable,
621 			   GAsyncReadyCallback  callback,
622 			   gpointer             user_data)
623 {
624   GInputStreamClass *class;
625   GError *error = NULL;
626 
627   g_return_if_fail (G_IS_INPUT_STREAM (stream));
628   g_return_if_fail (buffer != NULL);
629 
630   if (count == 0)
631     {
632       GTask *task;
633 
634       task = g_task_new (stream, cancellable, callback, user_data);
635       g_task_set_source_tag (task, g_input_stream_read_async);
636       g_task_return_int (task, 0);
637       g_object_unref (task);
638       return;
639     }
640 
641   if (((gssize) count) < 0)
642     {
643       g_task_report_new_error (stream, callback, user_data,
644                                g_input_stream_read_async,
645                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
646                                _("Too large count value passed to %s"),
647                                G_STRFUNC);
648       return;
649     }
650 
651   if (!g_input_stream_set_pending (stream, &error))
652     {
653       g_task_report_error (stream, callback, user_data,
654                            g_input_stream_read_async,
655                            error);
656       return;
657     }
658 
659   class = G_INPUT_STREAM_GET_CLASS (stream);
660   stream->priv->outstanding_callback = callback;
661   g_object_ref (stream);
662   class->read_async (stream, buffer, count, io_priority, cancellable,
663 		     async_ready_callback_wrapper, user_data);
664 }
665 
666 /**
667  * g_input_stream_read_finish:
668  * @stream: a #GInputStream.
669  * @result: a #GAsyncResult.
670  * @error: a #GError location to store the error occurring, or %NULL to
671  * ignore.
672  *
673  * Finishes an asynchronous stream read operation.
674  *
675  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
676  **/
677 gssize
g_input_stream_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)678 g_input_stream_read_finish (GInputStream  *stream,
679 			    GAsyncResult  *result,
680 			    GError       **error)
681 {
682   GInputStreamClass *class;
683 
684   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
685   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
686 
687   if (g_async_result_legacy_propagate_error (result, error))
688     return -1;
689   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
690     return g_task_propagate_int (G_TASK (result), error);
691 
692   class = G_INPUT_STREAM_GET_CLASS (stream);
693   return class->read_finish (stream, result, error);
694 }
695 
696 typedef struct
697 {
698   gchar *buffer;
699   gsize to_read;
700   gsize bytes_read;
701 } AsyncReadAll;
702 
703 static void
free_async_read_all(gpointer data)704 free_async_read_all (gpointer data)
705 {
706   g_slice_free (AsyncReadAll, data);
707 }
708 
709 static void
read_all_callback(GObject * stream,GAsyncResult * result,gpointer user_data)710 read_all_callback (GObject      *stream,
711                    GAsyncResult *result,
712                    gpointer      user_data)
713 {
714   GTask *task = user_data;
715   AsyncReadAll *data = g_task_get_task_data (task);
716   gboolean got_eof = FALSE;
717 
718   if (result)
719     {
720       GError *error = NULL;
721       gssize nread;
722 
723       nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
724 
725       if (nread == -1)
726         {
727           g_task_return_error (task, error);
728           g_object_unref (task);
729           return;
730         }
731 
732       g_assert_cmpint (nread, <=, data->to_read);
733       data->to_read -= nread;
734       data->bytes_read += nread;
735       got_eof = (nread == 0);
736     }
737 
738   if (got_eof || data->to_read == 0)
739     {
740       g_task_return_boolean (task, TRUE);
741       g_object_unref (task);
742     }
743 
744   else
745     g_input_stream_read_async (G_INPUT_STREAM (stream),
746                                data->buffer + data->bytes_read,
747                                data->to_read,
748                                g_task_get_priority (task),
749                                g_task_get_cancellable (task),
750                                read_all_callback, task);
751 }
752 
753 
754 static void
read_all_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)755 read_all_async_thread (GTask        *task,
756                        gpointer      source_object,
757                        gpointer      task_data,
758                        GCancellable *cancellable)
759 {
760   GInputStream *stream = source_object;
761   AsyncReadAll *data = task_data;
762   GError *error = NULL;
763 
764   if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
765                                g_task_get_cancellable (task), &error))
766     g_task_return_boolean (task, TRUE);
767   else
768     g_task_return_error (task, error);
769 }
770 
771 /**
772  * g_input_stream_read_all_async:
773  * @stream: A #GInputStream
774  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
775  *     a buffer to read data into (which should be at least count bytes long)
776  * @count: the number of bytes that will be read from the stream
777  * @io_priority: the [I/O priority][io-priority] of the request
778  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
779  * @callback: (scope async): callback to call when the request is satisfied
780  * @user_data: (closure): the data to pass to callback function
781  *
782  * Request an asynchronous read of @count bytes from the stream into the
783  * buffer starting at @buffer.
784  *
785  * This is the asynchronous equivalent of g_input_stream_read_all().
786  *
787  * Call g_input_stream_read_all_finish() to collect the result.
788  *
789  * Any outstanding I/O request with higher priority (lower numerical
790  * value) will be executed before an outstanding request with lower
791  * priority. Default priority is %G_PRIORITY_DEFAULT.
792  *
793  * Since: 2.44
794  **/
795 void
g_input_stream_read_all_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)796 g_input_stream_read_all_async (GInputStream        *stream,
797                                void                *buffer,
798                                gsize                count,
799                                int                  io_priority,
800                                GCancellable        *cancellable,
801                                GAsyncReadyCallback  callback,
802                                gpointer             user_data)
803 {
804   AsyncReadAll *data;
805   GTask *task;
806 
807   g_return_if_fail (G_IS_INPUT_STREAM (stream));
808   g_return_if_fail (buffer != NULL || count == 0);
809 
810   task = g_task_new (stream, cancellable, callback, user_data);
811   data = g_slice_new0 (AsyncReadAll);
812   data->buffer = buffer;
813   data->to_read = count;
814 
815   g_task_set_source_tag (task, g_input_stream_read_all_async);
816   g_task_set_task_data (task, data, free_async_read_all);
817   g_task_set_priority (task, io_priority);
818 
819   /* If async reads are going to be handled via the threadpool anyway
820    * then we may as well do it with a single dispatch instead of
821    * bouncing in and out.
822    */
823   if (g_input_stream_async_read_is_via_threads (stream))
824     {
825       g_task_run_in_thread (task, read_all_async_thread);
826       g_object_unref (task);
827     }
828   else
829     read_all_callback (G_OBJECT (stream), NULL, task);
830 }
831 
832 /**
833  * g_input_stream_read_all_finish:
834  * @stream: a #GInputStream
835  * @result: a #GAsyncResult
836  * @bytes_read: (out): location to store the number of bytes that was read from the stream
837  * @error: a #GError location to store the error occurring, or %NULL to ignore
838  *
839  * Finishes an asynchronous stream read operation started with
840  * g_input_stream_read_all_async().
841  *
842  * As a special exception to the normal conventions for functions that
843  * use #GError, if this function returns %FALSE (and sets @error) then
844  * @bytes_read will be set to the number of bytes that were successfully
845  * read before the error was encountered.  This functionality is only
846  * available from C.  If you need it from another language then you must
847  * write your own loop around g_input_stream_read_async().
848  *
849  * Returns: %TRUE on success, %FALSE if there was an error
850  *
851  * Since: 2.44
852  **/
853 gboolean
g_input_stream_read_all_finish(GInputStream * stream,GAsyncResult * result,gsize * bytes_read,GError ** error)854 g_input_stream_read_all_finish (GInputStream  *stream,
855                                 GAsyncResult  *result,
856                                 gsize         *bytes_read,
857                                 GError       **error)
858 {
859   GTask *task;
860 
861   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
862   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
863 
864   task = G_TASK (result);
865 
866   if (bytes_read)
867     {
868       AsyncReadAll *data = g_task_get_task_data (task);
869 
870       *bytes_read = data->bytes_read;
871     }
872 
873   return g_task_propagate_boolean (task, error);
874 }
875 
876 static void
read_bytes_callback(GObject * stream,GAsyncResult * result,gpointer user_data)877 read_bytes_callback (GObject      *stream,
878 		     GAsyncResult *result,
879 		     gpointer      user_data)
880 {
881   GTask *task = user_data;
882   guchar *buf = g_task_get_task_data (task);
883   GError *error = NULL;
884   gssize nread;
885   GBytes *bytes = NULL;
886 
887   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
888 				      result, &error);
889   if (nread == -1)
890     {
891       g_free (buf);
892       g_task_return_error (task, error);
893     }
894   else if (nread == 0)
895     {
896       g_free (buf);
897       bytes = g_bytes_new_static ("", 0);
898     }
899   else
900     bytes = g_bytes_new_take (buf, nread);
901 
902   if (bytes)
903     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
904 
905   g_object_unref (task);
906 }
907 
908 /**
909  * g_input_stream_read_bytes_async:
910  * @stream: A #GInputStream.
911  * @count: the number of bytes that will be read from the stream
912  * @io_priority: the [I/O priority][io-priority] of the request
913  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
914  * @callback: (scope async): callback to call when the request is satisfied
915  * @user_data: (closure): the data to pass to callback function
916  *
917  * Request an asynchronous read of @count bytes from the stream into a
918  * new #GBytes. When the operation is finished @callback will be
919  * called. You can then call g_input_stream_read_bytes_finish() to get the
920  * result of the operation.
921  *
922  * During an async request no other sync and async calls are allowed
923  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
924  *
925  * A value of @count larger than %G_MAXSSIZE will cause a
926  * %G_IO_ERROR_INVALID_ARGUMENT error.
927  *
928  * On success, the new #GBytes will be passed to the callback. It is
929  * not an error if this is smaller than the requested size, as it can
930  * happen e.g. near the end of a file, but generally we try to read as
931  * many bytes as requested. Zero is returned on end of file (or if
932  * @count is zero), but never otherwise.
933  *
934  * Any outstanding I/O request with higher priority (lower numerical
935  * value) will be executed before an outstanding request with lower
936  * priority. Default priority is %G_PRIORITY_DEFAULT.
937  *
938  * Since: 2.34
939  **/
940 void
g_input_stream_read_bytes_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)941 g_input_stream_read_bytes_async (GInputStream          *stream,
942 				 gsize                  count,
943 				 int                    io_priority,
944 				 GCancellable          *cancellable,
945 				 GAsyncReadyCallback    callback,
946 				 gpointer               user_data)
947 {
948   GTask *task;
949   guchar *buf;
950 
951   task = g_task_new (stream, cancellable, callback, user_data);
952   g_task_set_source_tag (task, g_input_stream_read_bytes_async);
953 
954   buf = g_malloc (count);
955   g_task_set_task_data (task, buf, NULL);
956 
957   g_input_stream_read_async (stream, buf, count,
958                              io_priority, cancellable,
959                              read_bytes_callback, task);
960 }
961 
962 /**
963  * g_input_stream_read_bytes_finish:
964  * @stream: a #GInputStream.
965  * @result: a #GAsyncResult.
966  * @error: a #GError location to store the error occurring, or %NULL to
967  *   ignore.
968  *
969  * Finishes an asynchronous stream read-into-#GBytes operation.
970  *
971  * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
972  *
973  * Since: 2.34
974  **/
975 GBytes *
g_input_stream_read_bytes_finish(GInputStream * stream,GAsyncResult * result,GError ** error)976 g_input_stream_read_bytes_finish (GInputStream  *stream,
977 				  GAsyncResult  *result,
978 				  GError       **error)
979 {
980   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
981   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
982 
983   return g_task_propagate_pointer (G_TASK (result), error);
984 }
985 
986 /**
987  * g_input_stream_skip_async:
988  * @stream: A #GInputStream.
989  * @count: the number of bytes that will be skipped from the stream
990  * @io_priority: the [I/O priority][io-priority] of the request
991  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
992  * @callback: (scope async): callback to call when the request is satisfied
993  * @user_data: (closure): the data to pass to callback function
994  *
995  * Request an asynchronous skip of @count bytes from the stream.
996  * When the operation is finished @callback will be called.
997  * You can then call g_input_stream_skip_finish() to get the result
998  * of the operation.
999  *
1000  * During an async request no other sync and async calls are allowed,
1001  * and will result in %G_IO_ERROR_PENDING errors.
1002  *
1003  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1004  *
1005  * On success, the number of bytes skipped will be passed to the callback.
1006  * It is not an error if this is not the same as the requested size, as it
1007  * can happen e.g. near the end of a file, but generally we try to skip
1008  * as many bytes as requested. Zero is returned on end of file
1009  * (or if @count is zero), but never otherwise.
1010  *
1011  * Any outstanding i/o request with higher priority (lower numerical value)
1012  * will be executed before an outstanding request with lower priority.
1013  * Default priority is %G_PRIORITY_DEFAULT.
1014  *
1015  * The asynchronous methods have a default fallback that uses threads to
1016  * implement asynchronicity, so they are optional for inheriting classes.
1017  * However, if you override one, you must override all.
1018  **/
1019 void
g_input_stream_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1020 g_input_stream_skip_async (GInputStream        *stream,
1021 			   gsize                count,
1022 			   int                  io_priority,
1023 			   GCancellable        *cancellable,
1024 			   GAsyncReadyCallback  callback,
1025 			   gpointer             user_data)
1026 {
1027   GInputStreamClass *class;
1028   GError *error = NULL;
1029 
1030   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1031 
1032   if (count == 0)
1033     {
1034       GTask *task;
1035 
1036       task = g_task_new (stream, cancellable, callback, user_data);
1037       g_task_set_source_tag (task, g_input_stream_skip_async);
1038       g_task_return_int (task, 0);
1039       g_object_unref (task);
1040       return;
1041     }
1042 
1043   if (((gssize) count) < 0)
1044     {
1045       g_task_report_new_error (stream, callback, user_data,
1046                                g_input_stream_skip_async,
1047                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1048                                _("Too large count value passed to %s"),
1049                                G_STRFUNC);
1050       return;
1051     }
1052 
1053   if (!g_input_stream_set_pending (stream, &error))
1054     {
1055       g_task_report_error (stream, callback, user_data,
1056                            g_input_stream_skip_async,
1057                            error);
1058       return;
1059     }
1060 
1061   class = G_INPUT_STREAM_GET_CLASS (stream);
1062   stream->priv->outstanding_callback = callback;
1063   g_object_ref (stream);
1064   class->skip_async (stream, count, io_priority, cancellable,
1065 		     async_ready_callback_wrapper, user_data);
1066 }
1067 
1068 /**
1069  * g_input_stream_skip_finish:
1070  * @stream: a #GInputStream.
1071  * @result: a #GAsyncResult.
1072  * @error: a #GError location to store the error occurring, or %NULL to
1073  * ignore.
1074  *
1075  * Finishes a stream skip operation.
1076  *
1077  * Returns: the size of the bytes skipped, or `-1` on error.
1078  **/
1079 gssize
g_input_stream_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1080 g_input_stream_skip_finish (GInputStream  *stream,
1081 			    GAsyncResult  *result,
1082 			    GError       **error)
1083 {
1084   GInputStreamClass *class;
1085 
1086   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1087   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1088 
1089   if (g_async_result_legacy_propagate_error (result, error))
1090     return -1;
1091   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1092     return g_task_propagate_int (G_TASK (result), error);
1093 
1094   class = G_INPUT_STREAM_GET_CLASS (stream);
1095   return class->skip_finish (stream, result, error);
1096 }
1097 
1098 /**
1099  * g_input_stream_close_async:
1100  * @stream: A #GInputStream.
1101  * @io_priority: the [I/O priority][io-priority] of the request
1102  * @cancellable: (nullable): optional cancellable object
1103  * @callback: (scope async): callback to call when the request is satisfied
1104  * @user_data: (closure): the data to pass to callback function
1105  *
1106  * Requests an asynchronous closes of the stream, releasing resources related to it.
1107  * When the operation is finished @callback will be called.
1108  * You can then call g_input_stream_close_finish() to get the result of the
1109  * operation.
1110  *
1111  * For behaviour details see g_input_stream_close().
1112  *
1113  * The asynchronous methods have a default fallback that uses threads to implement
1114  * asynchronicity, so they are optional for inheriting classes. However, if you
1115  * override one you must override all.
1116  **/
1117 void
g_input_stream_close_async(GInputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1118 g_input_stream_close_async (GInputStream        *stream,
1119 			    int                  io_priority,
1120 			    GCancellable        *cancellable,
1121 			    GAsyncReadyCallback  callback,
1122 			    gpointer             user_data)
1123 {
1124   GInputStreamClass *class;
1125   GError *error = NULL;
1126 
1127   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1128 
1129   if (stream->priv->closed)
1130     {
1131       GTask *task;
1132 
1133       task = g_task_new (stream, cancellable, callback, user_data);
1134       g_task_set_source_tag (task, g_input_stream_close_async);
1135       g_task_return_boolean (task, TRUE);
1136       g_object_unref (task);
1137       return;
1138     }
1139 
1140   if (!g_input_stream_set_pending (stream, &error))
1141     {
1142       g_task_report_error (stream, callback, user_data,
1143                            g_input_stream_close_async,
1144                            error);
1145       return;
1146     }
1147 
1148   class = G_INPUT_STREAM_GET_CLASS (stream);
1149   stream->priv->outstanding_callback = callback;
1150   g_object_ref (stream);
1151   class->close_async (stream, io_priority, cancellable,
1152 		      async_ready_close_callback_wrapper, user_data);
1153 }
1154 
1155 /**
1156  * g_input_stream_close_finish:
1157  * @stream: a #GInputStream.
1158  * @result: a #GAsyncResult.
1159  * @error: a #GError location to store the error occurring, or %NULL to
1160  * ignore.
1161  *
1162  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1163  *
1164  * Returns: %TRUE if the stream was closed successfully.
1165  **/
1166 gboolean
g_input_stream_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1167 g_input_stream_close_finish (GInputStream  *stream,
1168 			     GAsyncResult  *result,
1169 			     GError       **error)
1170 {
1171   GInputStreamClass *class;
1172 
1173   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1174   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1175 
1176   if (g_async_result_legacy_propagate_error (result, error))
1177     return FALSE;
1178   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1179     return g_task_propagate_boolean (G_TASK (result), error);
1180 
1181   class = G_INPUT_STREAM_GET_CLASS (stream);
1182   return class->close_finish (stream, result, error);
1183 }
1184 
1185 /**
1186  * g_input_stream_is_closed:
1187  * @stream: input stream.
1188  *
1189  * Checks if an input stream is closed.
1190  *
1191  * Returns: %TRUE if the stream is closed.
1192  **/
1193 gboolean
g_input_stream_is_closed(GInputStream * stream)1194 g_input_stream_is_closed (GInputStream *stream)
1195 {
1196   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1197 
1198   return stream->priv->closed;
1199 }
1200 
1201 /**
1202  * g_input_stream_has_pending:
1203  * @stream: input stream.
1204  *
1205  * Checks if an input stream has pending actions.
1206  *
1207  * Returns: %TRUE if @stream has pending actions.
1208  **/
1209 gboolean
g_input_stream_has_pending(GInputStream * stream)1210 g_input_stream_has_pending (GInputStream *stream)
1211 {
1212   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1213 
1214   return stream->priv->pending;
1215 }
1216 
1217 /**
1218  * g_input_stream_set_pending:
1219  * @stream: input stream
1220  * @error: a #GError location to store the error occurring, or %NULL to
1221  * ignore.
1222  *
1223  * Sets @stream to have actions pending. If the pending flag is
1224  * already set or @stream is closed, it will return %FALSE and set
1225  * @error.
1226  *
1227  * Returns: %TRUE if pending was previously unset and is now set.
1228  **/
1229 gboolean
g_input_stream_set_pending(GInputStream * stream,GError ** error)1230 g_input_stream_set_pending (GInputStream *stream, GError **error)
1231 {
1232   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1233 
1234   if (stream->priv->closed)
1235     {
1236       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1237                            _("Stream is already closed"));
1238       return FALSE;
1239     }
1240 
1241   if (stream->priv->pending)
1242     {
1243       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1244 		/* Translators: This is an error you get if there is already an
1245 		 * operation running against this stream when you try to start
1246 		 * one */
1247 		 _("Stream has outstanding operation"));
1248       return FALSE;
1249     }
1250 
1251   stream->priv->pending = TRUE;
1252   return TRUE;
1253 }
1254 
1255 /**
1256  * g_input_stream_clear_pending:
1257  * @stream: input stream
1258  *
1259  * Clears the pending flag on @stream.
1260  **/
1261 void
g_input_stream_clear_pending(GInputStream * stream)1262 g_input_stream_clear_pending (GInputStream *stream)
1263 {
1264   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1265 
1266   stream->priv->pending = FALSE;
1267 }
1268 
1269 /*< internal >
1270  * g_input_stream_async_read_is_via_threads:
1271  * @stream: input stream
1272  *
1273  * Checks if an input stream's read_async function uses threads.
1274  *
1275  * Returns: %TRUE if @stream's read_async function uses threads.
1276  **/
1277 gboolean
g_input_stream_async_read_is_via_threads(GInputStream * stream)1278 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1279 {
1280   GInputStreamClass *class;
1281 
1282   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1283 
1284   class = G_INPUT_STREAM_GET_CLASS (stream);
1285 
1286   return (class->read_async == g_input_stream_real_read_async &&
1287       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1288         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1289 }
1290 
1291 /*< internal >
1292  * g_input_stream_async_close_is_via_threads:
1293  * @stream: input stream
1294  *
1295  * Checks if an input stream's close_async function uses threads.
1296  *
1297  * Returns: %TRUE if @stream's close_async function uses threads.
1298  **/
1299 gboolean
g_input_stream_async_close_is_via_threads(GInputStream * stream)1300 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1301 {
1302   GInputStreamClass *class;
1303 
1304   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1305 
1306   class = G_INPUT_STREAM_GET_CLASS (stream);
1307 
1308   return class->close_async == g_input_stream_real_close_async;
1309 }
1310 
1311 /********************************************
1312  *   Default implementation of async ops    *
1313  ********************************************/
1314 
1315 typedef struct {
1316   void   *buffer;
1317   gsize   count;
1318 } ReadData;
1319 
1320 static void
free_read_data(ReadData * op)1321 free_read_data (ReadData *op)
1322 {
1323   g_slice_free (ReadData, op);
1324 }
1325 
1326 static void
read_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1327 read_async_thread (GTask        *task,
1328                    gpointer      source_object,
1329                    gpointer      task_data,
1330                    GCancellable *cancellable)
1331 {
1332   GInputStream *stream = source_object;
1333   ReadData *op = task_data;
1334   GInputStreamClass *class;
1335   GError *error = NULL;
1336   gssize nread;
1337 
1338   class = G_INPUT_STREAM_GET_CLASS (stream);
1339 
1340   nread = class->read_fn (stream,
1341                           op->buffer, op->count,
1342                           g_task_get_cancellable (task),
1343                           &error);
1344   if (nread == -1)
1345     g_task_return_error (task, error);
1346   else
1347     g_task_return_int (task, nread);
1348 }
1349 
1350 static void read_async_pollable (GPollableInputStream *stream,
1351                                  GTask                *task);
1352 
1353 static gboolean
read_async_pollable_ready(GPollableInputStream * stream,gpointer user_data)1354 read_async_pollable_ready (GPollableInputStream *stream,
1355 			   gpointer              user_data)
1356 {
1357   GTask *task = user_data;
1358 
1359   read_async_pollable (stream, task);
1360   return FALSE;
1361 }
1362 
1363 static void
read_async_pollable(GPollableInputStream * stream,GTask * task)1364 read_async_pollable (GPollableInputStream *stream,
1365                      GTask                *task)
1366 {
1367   ReadData *op = g_task_get_task_data (task);
1368   GError *error = NULL;
1369   gssize nread;
1370 
1371   if (g_task_return_error_if_cancelled (task))
1372     return;
1373 
1374   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1375     read_nonblocking (stream, op->buffer, op->count, &error);
1376 
1377   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1378     {
1379       GSource *source;
1380 
1381       g_error_free (error);
1382 
1383       source = g_pollable_input_stream_create_source (stream,
1384                                                       g_task_get_cancellable (task));
1385       g_task_attach_source (task, source,
1386                             (GSourceFunc) read_async_pollable_ready);
1387       g_source_unref (source);
1388       return;
1389     }
1390 
1391   if (nread == -1)
1392     g_task_return_error (task, error);
1393   else
1394     g_task_return_int (task, nread);
1395   /* g_input_stream_real_read_async() unrefs task */
1396 }
1397 
1398 
1399 static void
g_input_stream_real_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1400 g_input_stream_real_read_async (GInputStream        *stream,
1401 				void                *buffer,
1402 				gsize                count,
1403 				int                  io_priority,
1404 				GCancellable        *cancellable,
1405 				GAsyncReadyCallback  callback,
1406 				gpointer             user_data)
1407 {
1408   GTask *task;
1409   ReadData *op;
1410 
1411   op = g_slice_new0 (ReadData);
1412   task = g_task_new (stream, cancellable, callback, user_data);
1413   g_task_set_source_tag (task, g_input_stream_real_read_async);
1414   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1415   g_task_set_priority (task, io_priority);
1416   op->buffer = buffer;
1417   op->count = count;
1418 
1419   if (!g_input_stream_async_read_is_via_threads (stream))
1420     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1421   else
1422     g_task_run_in_thread (task, read_async_thread);
1423   g_object_unref (task);
1424 }
1425 
1426 static gssize
g_input_stream_real_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1427 g_input_stream_real_read_finish (GInputStream  *stream,
1428 				 GAsyncResult  *result,
1429 				 GError       **error)
1430 {
1431   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1432 
1433   return g_task_propagate_int (G_TASK (result), error);
1434 }
1435 
1436 
1437 static void
skip_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1438 skip_async_thread (GTask        *task,
1439                    gpointer      source_object,
1440                    gpointer      task_data,
1441                    GCancellable *cancellable)
1442 {
1443   GInputStream *stream = source_object;
1444   gsize count = GPOINTER_TO_SIZE (task_data);
1445   GInputStreamClass *class;
1446   GError *error = NULL;
1447   gssize ret;
1448 
1449   class = G_INPUT_STREAM_GET_CLASS (stream);
1450   ret = class->skip (stream, count,
1451                      g_task_get_cancellable (task),
1452                      &error);
1453   if (ret == -1)
1454     g_task_return_error (task, error);
1455   else
1456     g_task_return_int (task, ret);
1457 }
1458 
1459 typedef struct {
1460   char buffer[8192];
1461   gsize count;
1462   gsize count_skipped;
1463 } SkipFallbackAsyncData;
1464 
1465 static void
skip_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1466 skip_callback_wrapper (GObject      *source_object,
1467 		       GAsyncResult *res,
1468 		       gpointer      user_data)
1469 {
1470   GInputStreamClass *class;
1471   GTask *task = user_data;
1472   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1473   GError *error = NULL;
1474   gssize ret;
1475 
1476   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1477 
1478   if (ret > 0)
1479     {
1480       data->count -= ret;
1481       data->count_skipped += ret;
1482 
1483       if (data->count > 0)
1484 	{
1485 	  class = G_INPUT_STREAM_GET_CLASS (source_object);
1486 	  class->read_async (G_INPUT_STREAM (source_object),
1487                              data->buffer, MIN (8192, data->count),
1488                              g_task_get_priority (task),
1489                              g_task_get_cancellable (task),
1490                              skip_callback_wrapper, task);
1491 	  return;
1492 	}
1493     }
1494 
1495   if (ret == -1 &&
1496       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1497       data->count_skipped)
1498     {
1499       /* No error, return partial read */
1500       g_clear_error (&error);
1501     }
1502 
1503   if (error)
1504     g_task_return_error (task, error);
1505   else
1506     g_task_return_int (task, data->count_skipped);
1507   g_object_unref (task);
1508  }
1509 
1510 static void
g_input_stream_real_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1511 g_input_stream_real_skip_async (GInputStream        *stream,
1512 				gsize                count,
1513 				int                  io_priority,
1514 				GCancellable        *cancellable,
1515 				GAsyncReadyCallback  callback,
1516 				gpointer             user_data)
1517 {
1518   GInputStreamClass *class;
1519   SkipFallbackAsyncData *data;
1520   GTask *task;
1521 
1522   class = G_INPUT_STREAM_GET_CLASS (stream);
1523 
1524   task = g_task_new (stream, cancellable, callback, user_data);
1525   g_task_set_source_tag (task, g_input_stream_real_skip_async);
1526   g_task_set_priority (task, io_priority);
1527 
1528   if (g_input_stream_async_read_is_via_threads (stream))
1529     {
1530       /* Read is thread-using async fallback.
1531        * Make skip use threads too, so that we can use a possible sync skip
1532        * implementation. */
1533       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1534 
1535       g_task_run_in_thread (task, skip_async_thread);
1536       g_object_unref (task);
1537     }
1538   else
1539     {
1540       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1541 
1542       /* There is a custom async read function, lets use that. */
1543       data = g_new (SkipFallbackAsyncData, 1);
1544       data->count = count;
1545       data->count_skipped = 0;
1546       g_task_set_task_data (task, data, g_free);
1547       g_task_set_check_cancellable (task, FALSE);
1548       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1549 			 skip_callback_wrapper, task);
1550     }
1551 
1552 }
1553 
1554 static gssize
g_input_stream_real_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1555 g_input_stream_real_skip_finish (GInputStream  *stream,
1556 				 GAsyncResult  *result,
1557 				 GError       **error)
1558 {
1559   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1560 
1561   return g_task_propagate_int (G_TASK (result), error);
1562 }
1563 
1564 static void
close_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1565 close_async_thread (GTask        *task,
1566                     gpointer      source_object,
1567                     gpointer      task_data,
1568                     GCancellable *cancellable)
1569 {
1570   GInputStream *stream = source_object;
1571   GInputStreamClass *class;
1572   GError *error = NULL;
1573   gboolean result;
1574 
1575   class = G_INPUT_STREAM_GET_CLASS (stream);
1576   if (class->close_fn)
1577     {
1578       result = class->close_fn (stream,
1579                                 g_task_get_cancellable (task),
1580                                 &error);
1581       if (!result)
1582         {
1583           g_task_return_error (task, error);
1584           return;
1585         }
1586     }
1587 
1588   g_task_return_boolean (task, TRUE);
1589 }
1590 
1591 static void
g_input_stream_real_close_async(GInputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1592 g_input_stream_real_close_async (GInputStream        *stream,
1593 				 int                  io_priority,
1594 				 GCancellable        *cancellable,
1595 				 GAsyncReadyCallback  callback,
1596 				 gpointer             user_data)
1597 {
1598   GTask *task;
1599 
1600   task = g_task_new (stream, cancellable, callback, user_data);
1601   g_task_set_source_tag (task, g_input_stream_real_close_async);
1602   g_task_set_check_cancellable (task, FALSE);
1603   g_task_set_priority (task, io_priority);
1604 
1605   g_task_run_in_thread (task, close_async_thread);
1606   g_object_unref (task);
1607 }
1608 
1609 static gboolean
g_input_stream_real_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1610 g_input_stream_real_close_finish (GInputStream  *stream,
1611 				  GAsyncResult  *result,
1612 				  GError       **error)
1613 {
1614   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1615 
1616   return g_task_propagate_boolean (G_TASK (result), error);
1617 }
1618