• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20 
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24 
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32 
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()).
41  *
42  * To copy the content of an input stream to an output stream without
43  * manually handling the reads and writes, use g_output_stream_splice().
44  *
45  * See the documentation for #GIOStream for details of thread safety of
46  * streaming APIs.
47  *
48  * All of these functions have async variants too.
49  **/
50 
51 struct _GInputStreamPrivate {
52   guint closed : 1;
53   guint pending : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56 
57 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
58 
59 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
60 						  gsize                 count,
61 						  GCancellable         *cancellable,
62 						  GError              **error);
63 static void     g_input_stream_real_read_async   (GInputStream         *stream,
64 						  void                 *buffer,
65 						  gsize                 count,
66 						  int                   io_priority,
67 						  GCancellable         *cancellable,
68 						  GAsyncReadyCallback   callback,
69 						  gpointer              user_data);
70 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
71 						  GAsyncResult         *result,
72 						  GError              **error);
73 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
74 						  gsize                 count,
75 						  int                   io_priority,
76 						  GCancellable         *cancellable,
77 						  GAsyncReadyCallback   callback,
78 						  gpointer              data);
79 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
80 						  GAsyncResult         *result,
81 						  GError              **error);
82 static void     g_input_stream_real_close_async  (GInputStream         *stream,
83 						  int                   io_priority,
84 						  GCancellable         *cancellable,
85 						  GAsyncReadyCallback   callback,
86 						  gpointer              data);
87 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
88 						  GAsyncResult         *result,
89 						  GError              **error);
90 
91 static void
g_input_stream_dispose(GObject * object)92 g_input_stream_dispose (GObject *object)
93 {
94   GInputStream *stream;
95 
96   stream = G_INPUT_STREAM (object);
97 
98   if (!stream->priv->closed)
99     g_input_stream_close (stream, NULL, NULL);
100 
101   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
102 }
103 
104 
105 static void
g_input_stream_class_init(GInputStreamClass * klass)106 g_input_stream_class_init (GInputStreamClass *klass)
107 {
108   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109 
110   gobject_class->dispose = g_input_stream_dispose;
111 
112   klass->skip = g_input_stream_real_skip;
113   klass->read_async = g_input_stream_real_read_async;
114   klass->read_finish = g_input_stream_real_read_finish;
115   klass->skip_async = g_input_stream_real_skip_async;
116   klass->skip_finish = g_input_stream_real_skip_finish;
117   klass->close_async = g_input_stream_real_close_async;
118   klass->close_finish = g_input_stream_real_close_finish;
119 }
120 
121 static void
g_input_stream_init(GInputStream * stream)122 g_input_stream_init (GInputStream *stream)
123 {
124   stream->priv = g_input_stream_get_instance_private (stream);
125 }
126 
127 /**
128  * g_input_stream_read:
129  * @stream: a #GInputStream.
130  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
131  *     a buffer to read data into (which should be at least count bytes long).
132  * @count: the number of bytes that will be read from the stream
133  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
134  * @error: location to store the error occurring, or %NULL to ignore
135  *
136  * Tries to read @count bytes from the stream into the buffer starting at
137  * @buffer. Will block during this read.
138  *
139  * If count is zero returns zero and does nothing. A value of @count
140  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
141  *
142  * On success, the number of bytes read into the buffer is returned.
143  * It is not an error if this is not the same as the requested size, as it
144  * can happen e.g. near the end of a file. Zero is returned on end of file
145  * (or if @count is zero),  but never otherwise.
146  *
147  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
148  * at any position, and this function doesn't nul-terminate the @buffer.
149  *
150  * If @cancellable is not %NULL, then the operation can be cancelled by
151  * triggering the cancellable object from another thread. If the operation
152  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
153  * operation was partially finished when the operation was cancelled the
154  * partial result will be returned, without an error.
155  *
156  * On error -1 is returned and @error is set accordingly.
157  *
158  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
159  **/
160 gssize
g_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)161 g_input_stream_read  (GInputStream  *stream,
162 		      void          *buffer,
163 		      gsize          count,
164 		      GCancellable  *cancellable,
165 		      GError       **error)
166 {
167   GInputStreamClass *class;
168   gssize res;
169 
170   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
171   g_return_val_if_fail (buffer != NULL, 0);
172 
173   if (count == 0)
174     return 0;
175 
176   if (((gssize) count) < 0)
177     {
178       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
179 		   _("Too large count value passed to %s"), G_STRFUNC);
180       return -1;
181     }
182 
183   class = G_INPUT_STREAM_GET_CLASS (stream);
184 
185   if (class->read_fn == NULL)
186     {
187       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
188                            _("Input stream doesn’t implement read"));
189       return -1;
190     }
191 
192   if (!g_input_stream_set_pending (stream, error))
193     return -1;
194 
195   if (cancellable)
196     g_cancellable_push_current (cancellable);
197 
198   res = class->read_fn (stream, buffer, count, cancellable, error);
199 
200   if (cancellable)
201     g_cancellable_pop_current (cancellable);
202 
203   g_input_stream_clear_pending (stream);
204 
205   return res;
206 }
207 
208 /**
209  * g_input_stream_read_all:
210  * @stream: a #GInputStream.
211  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
212  *     a buffer to read data into (which should be at least count bytes long).
213  * @count: the number of bytes that will be read from the stream
214  * @bytes_read: (out): location to store the number of bytes that was read from the stream
215  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
216  * @error: location to store the error occurring, or %NULL to ignore
217  *
218  * Tries to read @count bytes from the stream into the buffer starting at
219  * @buffer. Will block during this read.
220  *
221  * This function is similar to g_input_stream_read(), except it tries to
222  * read as many bytes as requested, only stopping on an error or end of stream.
223  *
224  * On a successful read of @count bytes, or if we reached the end of the
225  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
226  * read into @buffer.
227  *
228  * If there is an error during the operation %FALSE is returned and @error
229  * is set to indicate the error status.
230  *
231  * As a special exception to the normal conventions for functions that
232  * use #GError, if this function returns %FALSE (and sets @error) then
233  * @bytes_read will be set to the number of bytes that were successfully
234  * read before the error was encountered.  This functionality is only
235  * available from C.  If you need it from another language then you must
236  * write your own loop around g_input_stream_read().
237  *
238  * Returns: %TRUE on success, %FALSE if there was an error
239  **/
240 gboolean
g_input_stream_read_all(GInputStream * stream,void * buffer,gsize count,gsize * bytes_read,GCancellable * cancellable,GError ** error)241 g_input_stream_read_all (GInputStream  *stream,
242 			 void          *buffer,
243 			 gsize          count,
244 			 gsize         *bytes_read,
245 			 GCancellable  *cancellable,
246 			 GError       **error)
247 {
248   gsize _bytes_read;
249   gssize res;
250 
251   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252   g_return_val_if_fail (buffer != NULL, FALSE);
253 
254   _bytes_read = 0;
255   while (_bytes_read < count)
256     {
257       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258 				 cancellable, error);
259       if (res == -1)
260 	{
261 	  if (bytes_read)
262 	    *bytes_read = _bytes_read;
263 	  return FALSE;
264 	}
265 
266       if (res == 0)
267 	break;
268 
269       _bytes_read += res;
270     }
271 
272   if (bytes_read)
273     *bytes_read = _bytes_read;
274   return TRUE;
275 }
276 
277 /**
278  * g_input_stream_read_bytes:
279  * @stream: a #GInputStream.
280  * @count: maximum number of bytes that will be read from the stream. Common
281  * values include 4096 and 8192.
282  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
283  * @error: location to store the error occurring, or %NULL to ignore
284  *
285  * Like g_input_stream_read(), this tries to read @count bytes from
286  * the stream in a blocking fashion. However, rather than reading into
287  * a user-supplied buffer, this will create a new #GBytes containing
288  * the data that was read. This may be easier to use from language
289  * bindings.
290  *
291  * If count is zero, returns a zero-length #GBytes and does nothing. A
292  * value of @count larger than %G_MAXSSIZE will cause a
293  * %G_IO_ERROR_INVALID_ARGUMENT error.
294  *
295  * On success, a new #GBytes is returned. It is not an error if the
296  * size of this object is not the same as the requested size, as it
297  * can happen e.g. near the end of a file. A zero-length #GBytes is
298  * returned on end of file (or if @count is zero), but never
299  * otherwise.
300  *
301  * If @cancellable is not %NULL, then the operation can be cancelled by
302  * triggering the cancellable object from another thread. If the operation
303  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304  * operation was partially finished when the operation was cancelled the
305  * partial result will be returned, without an error.
306  *
307  * On error %NULL is returned and @error is set accordingly.
308  *
309  * Returns: (transfer full): a new #GBytes, or %NULL on error
310  *
311  * Since: 2.34
312  **/
313 GBytes *
g_input_stream_read_bytes(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)314 g_input_stream_read_bytes (GInputStream  *stream,
315 			   gsize          count,
316 			   GCancellable  *cancellable,
317 			   GError       **error)
318 {
319   guchar *buf;
320   gssize nread;
321 
322   buf = g_malloc (count);
323   nread = g_input_stream_read (stream, buf, count, cancellable, error);
324   if (nread == -1)
325     {
326       g_free (buf);
327       return NULL;
328     }
329   else if (nread == 0)
330     {
331       g_free (buf);
332       return g_bytes_new_static ("", 0);
333     }
334   else
335     return g_bytes_new_take (buf, nread);
336 }
337 
338 /**
339  * g_input_stream_skip:
340  * @stream: a #GInputStream.
341  * @count: the number of bytes that will be skipped from the stream
342  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
343  * @error: location to store the error occurring, or %NULL to ignore
344  *
345  * Tries to skip @count bytes from the stream. Will block during the operation.
346  *
347  * This is identical to g_input_stream_read(), from a behaviour standpoint,
348  * but the bytes that are skipped are not returned to the user. Some
349  * streams have an implementation that is more efficient than reading the data.
350  *
351  * This function is optional for inherited classes, as the default implementation
352  * emulates it using read.
353  *
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
357  * operation was partially finished when the operation was cancelled the
358  * partial result will be returned, without an error.
359  *
360  * Returns: Number of bytes skipped, or -1 on error
361  **/
362 gssize
g_input_stream_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)363 g_input_stream_skip (GInputStream  *stream,
364 		     gsize          count,
365 		     GCancellable  *cancellable,
366 		     GError       **error)
367 {
368   GInputStreamClass *class;
369   gssize res;
370 
371   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
372 
373   if (count == 0)
374     return 0;
375 
376   if (((gssize) count) < 0)
377     {
378       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
379 		   _("Too large count value passed to %s"), G_STRFUNC);
380       return -1;
381     }
382 
383   class = G_INPUT_STREAM_GET_CLASS (stream);
384 
385   if (!g_input_stream_set_pending (stream, error))
386     return -1;
387 
388   if (cancellable)
389     g_cancellable_push_current (cancellable);
390 
391   res = class->skip (stream, count, cancellable, error);
392 
393   if (cancellable)
394     g_cancellable_pop_current (cancellable);
395 
396   g_input_stream_clear_pending (stream);
397 
398   return res;
399 }
400 
401 static gssize
g_input_stream_real_skip(GInputStream * stream,gsize count,GCancellable * cancellable,GError ** error)402 g_input_stream_real_skip (GInputStream  *stream,
403 			  gsize          count,
404 			  GCancellable  *cancellable,
405 			  GError       **error)
406 {
407   GInputStreamClass *class;
408   gssize ret, read_bytes;
409   char buffer[8192];
410   GError *my_error;
411 
412   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
413     {
414       if (g_seekable_seek (G_SEEKABLE (stream),
415 			   count,
416 			   G_SEEK_CUR,
417 			   cancellable,
418 			   NULL))
419 	return count;
420     }
421 
422   /* If not seekable, or seek failed, fall back to reading data: */
423 
424   class = G_INPUT_STREAM_GET_CLASS (stream);
425 
426   read_bytes = 0;
427   while (1)
428     {
429       my_error = NULL;
430 
431       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
432                             cancellable, &my_error);
433       if (ret == -1)
434 	{
435 	  if (read_bytes > 0 &&
436 	      my_error->domain == G_IO_ERROR &&
437 	      my_error->code == G_IO_ERROR_CANCELLED)
438 	    {
439 	      g_error_free (my_error);
440 	      return read_bytes;
441 	    }
442 
443 	  g_propagate_error (error, my_error);
444 	  return -1;
445 	}
446 
447       count -= ret;
448       read_bytes += ret;
449 
450       if (ret == 0 || count == 0)
451         return read_bytes;
452     }
453 }
454 
455 /**
456  * g_input_stream_close:
457  * @stream: A #GInputStream.
458  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
459  * @error: location to store the error occurring, or %NULL to ignore
460  *
461  * Closes the stream, releasing resources related to it.
462  *
463  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
464  * Closing a stream multiple times will not return an error.
465  *
466  * Streams will be automatically closed when the last reference
467  * is dropped, but you might want to call this function to make sure
468  * resources are released as early as possible.
469  *
470  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
471  * open after the stream is closed. See the documentation for the individual
472  * stream for details.
473  *
474  * On failure the first error that happened will be reported, but the close
475  * operation will finish as much as possible. A stream that failed to
476  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
477  * is important to check and report the error to the user.
478  *
479  * If @cancellable is not %NULL, then the operation can be cancelled by
480  * triggering the cancellable object from another thread. If the operation
481  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
482  * Cancelling a close will still leave the stream closed, but some streams
483  * can use a faster close that doesn't block to e.g. check errors.
484  *
485  * Returns: %TRUE on success, %FALSE on failure
486  **/
487 gboolean
g_input_stream_close(GInputStream * stream,GCancellable * cancellable,GError ** error)488 g_input_stream_close (GInputStream  *stream,
489 		      GCancellable  *cancellable,
490 		      GError       **error)
491 {
492   GInputStreamClass *class;
493   gboolean res;
494 
495   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
496 
497   class = G_INPUT_STREAM_GET_CLASS (stream);
498 
499   if (stream->priv->closed)
500     return TRUE;
501 
502   res = TRUE;
503 
504   if (!g_input_stream_set_pending (stream, error))
505     return FALSE;
506 
507   if (cancellable)
508     g_cancellable_push_current (cancellable);
509 
510   if (class->close_fn)
511     res = class->close_fn (stream, cancellable, error);
512 
513   if (cancellable)
514     g_cancellable_pop_current (cancellable);
515 
516   g_input_stream_clear_pending (stream);
517 
518   stream->priv->closed = TRUE;
519 
520   return res;
521 }
522 
523 static void
async_ready_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)524 async_ready_callback_wrapper (GObject      *source_object,
525 			      GAsyncResult *res,
526 			      gpointer      user_data)
527 {
528   GInputStream *stream = G_INPUT_STREAM (source_object);
529 
530   g_input_stream_clear_pending (stream);
531   if (stream->priv->outstanding_callback)
532     (*stream->priv->outstanding_callback) (source_object, res, user_data);
533   g_object_unref (stream);
534 }
535 
536 static void
async_ready_close_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)537 async_ready_close_callback_wrapper (GObject      *source_object,
538 				    GAsyncResult *res,
539 				    gpointer      user_data)
540 {
541   GInputStream *stream = G_INPUT_STREAM (source_object);
542 
543   g_input_stream_clear_pending (stream);
544   stream->priv->closed = TRUE;
545   if (stream->priv->outstanding_callback)
546     (*stream->priv->outstanding_callback) (source_object, res, user_data);
547   g_object_unref (stream);
548 }
549 
550 /**
551  * g_input_stream_read_async:
552  * @stream: A #GInputStream.
553  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
554  *     a buffer to read data into (which should be at least count bytes long).
555  * @count: the number of bytes that will be read from the stream
556  * @io_priority: the [I/O priority][io-priority]
557  * of the request.
558  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
559  * @callback: (scope async): callback to call when the request is satisfied
560  * @user_data: (closure): the data to pass to callback function
561  *
562  * Request an asynchronous read of @count bytes from the stream into the buffer
563  * starting at @buffer. When the operation is finished @callback will be called.
564  * You can then call g_input_stream_read_finish() to get the result of the
565  * operation.
566  *
567  * During an async request no other sync and async calls are allowed on @stream, and will
568  * result in %G_IO_ERROR_PENDING errors.
569  *
570  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
571  *
572  * On success, the number of bytes read into the buffer will be passed to the
573  * callback. It is not an error if this is not the same as the requested size, as it
574  * can happen e.g. near the end of a file, but generally we try to read
575  * as many bytes as requested. Zero is returned on end of file
576  * (or if @count is zero),  but never otherwise.
577  *
578  * Any outstanding i/o request with higher priority (lower numerical value) will
579  * be executed before an outstanding request with lower priority. Default
580  * priority is %G_PRIORITY_DEFAULT.
581  *
582  * The asynchronous methods have a default fallback that uses threads to implement
583  * asynchronicity, so they are optional for inheriting classes. However, if you
584  * override one you must override all.
585  **/
586 void
g_input_stream_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)587 g_input_stream_read_async (GInputStream        *stream,
588 			   void                *buffer,
589 			   gsize                count,
590 			   int                  io_priority,
591 			   GCancellable        *cancellable,
592 			   GAsyncReadyCallback  callback,
593 			   gpointer             user_data)
594 {
595   GInputStreamClass *class;
596   GError *error = NULL;
597 
598   g_return_if_fail (G_IS_INPUT_STREAM (stream));
599   g_return_if_fail (buffer != NULL);
600 
601   if (count == 0)
602     {
603       GTask *task;
604 
605       task = g_task_new (stream, cancellable, callback, user_data);
606       g_task_set_source_tag (task, g_input_stream_read_async);
607       g_task_return_int (task, 0);
608       g_object_unref (task);
609       return;
610     }
611 
612   if (((gssize) count) < 0)
613     {
614       g_task_report_new_error (stream, callback, user_data,
615                                g_input_stream_read_async,
616                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
617                                _("Too large count value passed to %s"),
618                                G_STRFUNC);
619       return;
620     }
621 
622   if (!g_input_stream_set_pending (stream, &error))
623     {
624       g_task_report_error (stream, callback, user_data,
625                            g_input_stream_read_async,
626                            error);
627       return;
628     }
629 
630   class = G_INPUT_STREAM_GET_CLASS (stream);
631   stream->priv->outstanding_callback = callback;
632   g_object_ref (stream);
633   class->read_async (stream, buffer, count, io_priority, cancellable,
634 		     async_ready_callback_wrapper, user_data);
635 }
636 
637 /**
638  * g_input_stream_read_finish:
639  * @stream: a #GInputStream.
640  * @result: a #GAsyncResult.
641  * @error: a #GError location to store the error occurring, or %NULL to
642  * ignore.
643  *
644  * Finishes an asynchronous stream read operation.
645  *
646  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
647  **/
648 gssize
g_input_stream_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)649 g_input_stream_read_finish (GInputStream  *stream,
650 			    GAsyncResult  *result,
651 			    GError       **error)
652 {
653   GInputStreamClass *class;
654 
655   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
656   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
657 
658   if (g_async_result_legacy_propagate_error (result, error))
659     return -1;
660   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
661     return g_task_propagate_int (G_TASK (result), error);
662 
663   class = G_INPUT_STREAM_GET_CLASS (stream);
664   return class->read_finish (stream, result, error);
665 }
666 
667 typedef struct
668 {
669   gchar *buffer;
670   gsize to_read;
671   gsize bytes_read;
672 } AsyncReadAll;
673 
674 static void
free_async_read_all(gpointer data)675 free_async_read_all (gpointer data)
676 {
677   g_slice_free (AsyncReadAll, data);
678 }
679 
680 static void
read_all_callback(GObject * stream,GAsyncResult * result,gpointer user_data)681 read_all_callback (GObject      *stream,
682                    GAsyncResult *result,
683                    gpointer      user_data)
684 {
685   GTask *task = user_data;
686   AsyncReadAll *data = g_task_get_task_data (task);
687   gboolean got_eof = FALSE;
688 
689   if (result)
690     {
691       GError *error = NULL;
692       gssize nread;
693 
694       nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
695 
696       if (nread == -1)
697         {
698           g_task_return_error (task, error);
699           g_object_unref (task);
700           return;
701         }
702 
703       g_assert_cmpint (nread, <=, data->to_read);
704       data->to_read -= nread;
705       data->bytes_read += nread;
706       got_eof = (nread == 0);
707     }
708 
709   if (got_eof || data->to_read == 0)
710     {
711       g_task_return_boolean (task, TRUE);
712       g_object_unref (task);
713     }
714 
715   else
716     g_input_stream_read_async (G_INPUT_STREAM (stream),
717                                data->buffer + data->bytes_read,
718                                data->to_read,
719                                g_task_get_priority (task),
720                                g_task_get_cancellable (task),
721                                read_all_callback, task);
722 }
723 
724 
725 static void
read_all_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)726 read_all_async_thread (GTask        *task,
727                        gpointer      source_object,
728                        gpointer      task_data,
729                        GCancellable *cancellable)
730 {
731   GInputStream *stream = source_object;
732   AsyncReadAll *data = task_data;
733   GError *error = NULL;
734 
735   if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
736                                g_task_get_cancellable (task), &error))
737     g_task_return_boolean (task, TRUE);
738   else
739     g_task_return_error (task, error);
740 }
741 
742 /**
743  * g_input_stream_read_all_async:
744  * @stream: A #GInputStream
745  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
746  *     a buffer to read data into (which should be at least count bytes long)
747  * @count: the number of bytes that will be read from the stream
748  * @io_priority: the [I/O priority][io-priority] of the request
749  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
750  * @callback: (scope async): callback to call when the request is satisfied
751  * @user_data: (closure): the data to pass to callback function
752  *
753  * Request an asynchronous read of @count bytes from the stream into the
754  * buffer starting at @buffer.
755  *
756  * This is the asynchronous equivalent of g_input_stream_read_all().
757  *
758  * Call g_input_stream_read_all_finish() to collect the result.
759  *
760  * Any outstanding I/O request with higher priority (lower numerical
761  * value) will be executed before an outstanding request with lower
762  * priority. Default priority is %G_PRIORITY_DEFAULT.
763  *
764  * Since: 2.44
765  **/
766 void
g_input_stream_read_all_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)767 g_input_stream_read_all_async (GInputStream        *stream,
768                                void                *buffer,
769                                gsize                count,
770                                int                  io_priority,
771                                GCancellable        *cancellable,
772                                GAsyncReadyCallback  callback,
773                                gpointer             user_data)
774 {
775   AsyncReadAll *data;
776   GTask *task;
777 
778   g_return_if_fail (G_IS_INPUT_STREAM (stream));
779   g_return_if_fail (buffer != NULL || count == 0);
780 
781   task = g_task_new (stream, cancellable, callback, user_data);
782   data = g_slice_new0 (AsyncReadAll);
783   data->buffer = buffer;
784   data->to_read = count;
785 
786   g_task_set_source_tag (task, g_input_stream_read_all_async);
787   g_task_set_task_data (task, data, free_async_read_all);
788   g_task_set_priority (task, io_priority);
789 
790   /* If async reads are going to be handled via the threadpool anyway
791    * then we may as well do it with a single dispatch instead of
792    * bouncing in and out.
793    */
794   if (g_input_stream_async_read_is_via_threads (stream))
795     {
796       g_task_run_in_thread (task, read_all_async_thread);
797       g_object_unref (task);
798     }
799   else
800     read_all_callback (G_OBJECT (stream), NULL, task);
801 }
802 
803 /**
804  * g_input_stream_read_all_finish:
805  * @stream: a #GInputStream
806  * @result: a #GAsyncResult
807  * @bytes_read: (out): location to store the number of bytes that was read from the stream
808  * @error: a #GError location to store the error occurring, or %NULL to ignore
809  *
810  * Finishes an asynchronous stream read operation started with
811  * g_input_stream_read_all_async().
812  *
813  * As a special exception to the normal conventions for functions that
814  * use #GError, if this function returns %FALSE (and sets @error) then
815  * @bytes_read will be set to the number of bytes that were successfully
816  * read before the error was encountered.  This functionality is only
817  * available from C.  If you need it from another language then you must
818  * write your own loop around g_input_stream_read_async().
819  *
820  * Returns: %TRUE on success, %FALSE if there was an error
821  *
822  * Since: 2.44
823  **/
824 gboolean
g_input_stream_read_all_finish(GInputStream * stream,GAsyncResult * result,gsize * bytes_read,GError ** error)825 g_input_stream_read_all_finish (GInputStream  *stream,
826                                 GAsyncResult  *result,
827                                 gsize         *bytes_read,
828                                 GError       **error)
829 {
830   GTask *task;
831 
832   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
833   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
834 
835   task = G_TASK (result);
836 
837   if (bytes_read)
838     {
839       AsyncReadAll *data = g_task_get_task_data (task);
840 
841       *bytes_read = data->bytes_read;
842     }
843 
844   return g_task_propagate_boolean (task, error);
845 }
846 
847 static void
read_bytes_callback(GObject * stream,GAsyncResult * result,gpointer user_data)848 read_bytes_callback (GObject      *stream,
849 		     GAsyncResult *result,
850 		     gpointer      user_data)
851 {
852   GTask *task = user_data;
853   guchar *buf = g_task_get_task_data (task);
854   GError *error = NULL;
855   gssize nread;
856   GBytes *bytes = NULL;
857 
858   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
859 				      result, &error);
860   if (nread == -1)
861     {
862       g_free (buf);
863       g_task_return_error (task, error);
864     }
865   else if (nread == 0)
866     {
867       g_free (buf);
868       bytes = g_bytes_new_static ("", 0);
869     }
870   else
871     bytes = g_bytes_new_take (buf, nread);
872 
873   if (bytes)
874     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
875 
876   g_object_unref (task);
877 }
878 
879 /**
880  * g_input_stream_read_bytes_async:
881  * @stream: A #GInputStream.
882  * @count: the number of bytes that will be read from the stream
883  * @io_priority: the [I/O priority][io-priority] of the request
884  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
885  * @callback: (scope async): callback to call when the request is satisfied
886  * @user_data: (closure): the data to pass to callback function
887  *
888  * Request an asynchronous read of @count bytes from the stream into a
889  * new #GBytes. When the operation is finished @callback will be
890  * called. You can then call g_input_stream_read_bytes_finish() to get the
891  * result of the operation.
892  *
893  * During an async request no other sync and async calls are allowed
894  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
895  *
896  * A value of @count larger than %G_MAXSSIZE will cause a
897  * %G_IO_ERROR_INVALID_ARGUMENT error.
898  *
899  * On success, the new #GBytes will be passed to the callback. It is
900  * not an error if this is smaller than the requested size, as it can
901  * happen e.g. near the end of a file, but generally we try to read as
902  * many bytes as requested. Zero is returned on end of file (or if
903  * @count is zero), but never otherwise.
904  *
905  * Any outstanding I/O request with higher priority (lower numerical
906  * value) will be executed before an outstanding request with lower
907  * priority. Default priority is %G_PRIORITY_DEFAULT.
908  *
909  * Since: 2.34
910  **/
911 void
g_input_stream_read_bytes_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)912 g_input_stream_read_bytes_async (GInputStream          *stream,
913 				 gsize                  count,
914 				 int                    io_priority,
915 				 GCancellable          *cancellable,
916 				 GAsyncReadyCallback    callback,
917 				 gpointer               user_data)
918 {
919   GTask *task;
920   guchar *buf;
921 
922   task = g_task_new (stream, cancellable, callback, user_data);
923   g_task_set_source_tag (task, g_input_stream_read_bytes_async);
924 
925   buf = g_malloc (count);
926   g_task_set_task_data (task, buf, NULL);
927 
928   g_input_stream_read_async (stream, buf, count,
929                              io_priority, cancellable,
930                              read_bytes_callback, task);
931 }
932 
933 /**
934  * g_input_stream_read_bytes_finish:
935  * @stream: a #GInputStream.
936  * @result: a #GAsyncResult.
937  * @error: a #GError location to store the error occurring, or %NULL to
938  *   ignore.
939  *
940  * Finishes an asynchronous stream read-into-#GBytes operation.
941  *
942  * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
943  *
944  * Since: 2.34
945  **/
946 GBytes *
g_input_stream_read_bytes_finish(GInputStream * stream,GAsyncResult * result,GError ** error)947 g_input_stream_read_bytes_finish (GInputStream  *stream,
948 				  GAsyncResult  *result,
949 				  GError       **error)
950 {
951   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
952   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
953 
954   return g_task_propagate_pointer (G_TASK (result), error);
955 }
956 
957 /**
958  * g_input_stream_skip_async:
959  * @stream: A #GInputStream.
960  * @count: the number of bytes that will be skipped from the stream
961  * @io_priority: the [I/O priority][io-priority] of the request
962  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
963  * @callback: (scope async): callback to call when the request is satisfied
964  * @user_data: (closure): the data to pass to callback function
965  *
966  * Request an asynchronous skip of @count bytes from the stream.
967  * When the operation is finished @callback will be called.
968  * You can then call g_input_stream_skip_finish() to get the result
969  * of the operation.
970  *
971  * During an async request no other sync and async calls are allowed,
972  * and will result in %G_IO_ERROR_PENDING errors.
973  *
974  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
975  *
976  * On success, the number of bytes skipped will be passed to the callback.
977  * It is not an error if this is not the same as the requested size, as it
978  * can happen e.g. near the end of a file, but generally we try to skip
979  * as many bytes as requested. Zero is returned on end of file
980  * (or if @count is zero), but never otherwise.
981  *
982  * Any outstanding i/o request with higher priority (lower numerical value)
983  * will be executed before an outstanding request with lower priority.
984  * Default priority is %G_PRIORITY_DEFAULT.
985  *
986  * The asynchronous methods have a default fallback that uses threads to
987  * implement asynchronicity, so they are optional for inheriting classes.
988  * However, if you override one, you must override all.
989  **/
990 void
g_input_stream_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)991 g_input_stream_skip_async (GInputStream        *stream,
992 			   gsize                count,
993 			   int                  io_priority,
994 			   GCancellable        *cancellable,
995 			   GAsyncReadyCallback  callback,
996 			   gpointer             user_data)
997 {
998   GInputStreamClass *class;
999   GError *error = NULL;
1000 
1001   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1002 
1003   if (count == 0)
1004     {
1005       GTask *task;
1006 
1007       task = g_task_new (stream, cancellable, callback, user_data);
1008       g_task_set_source_tag (task, g_input_stream_skip_async);
1009       g_task_return_int (task, 0);
1010       g_object_unref (task);
1011       return;
1012     }
1013 
1014   if (((gssize) count) < 0)
1015     {
1016       g_task_report_new_error (stream, callback, user_data,
1017                                g_input_stream_skip_async,
1018                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1019                                _("Too large count value passed to %s"),
1020                                G_STRFUNC);
1021       return;
1022     }
1023 
1024   if (!g_input_stream_set_pending (stream, &error))
1025     {
1026       g_task_report_error (stream, callback, user_data,
1027                            g_input_stream_skip_async,
1028                            error);
1029       return;
1030     }
1031 
1032   class = G_INPUT_STREAM_GET_CLASS (stream);
1033   stream->priv->outstanding_callback = callback;
1034   g_object_ref (stream);
1035   class->skip_async (stream, count, io_priority, cancellable,
1036 		     async_ready_callback_wrapper, user_data);
1037 }
1038 
1039 /**
1040  * g_input_stream_skip_finish:
1041  * @stream: a #GInputStream.
1042  * @result: a #GAsyncResult.
1043  * @error: a #GError location to store the error occurring, or %NULL to
1044  * ignore.
1045  *
1046  * Finishes a stream skip operation.
1047  *
1048  * Returns: the size of the bytes skipped, or `-1` on error.
1049  **/
1050 gssize
g_input_stream_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1051 g_input_stream_skip_finish (GInputStream  *stream,
1052 			    GAsyncResult  *result,
1053 			    GError       **error)
1054 {
1055   GInputStreamClass *class;
1056 
1057   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1058   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1059 
1060   if (g_async_result_legacy_propagate_error (result, error))
1061     return -1;
1062   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1063     return g_task_propagate_int (G_TASK (result), error);
1064 
1065   class = G_INPUT_STREAM_GET_CLASS (stream);
1066   return class->skip_finish (stream, result, error);
1067 }
1068 
1069 /**
1070  * g_input_stream_close_async:
1071  * @stream: A #GInputStream.
1072  * @io_priority: the [I/O priority][io-priority] of the request
1073  * @cancellable: (nullable): optional cancellable object
1074  * @callback: (scope async): callback to call when the request is satisfied
1075  * @user_data: (closure): the data to pass to callback function
1076  *
1077  * Requests an asynchronous closes of the stream, releasing resources related to it.
1078  * When the operation is finished @callback will be called.
1079  * You can then call g_input_stream_close_finish() to get the result of the
1080  * operation.
1081  *
1082  * For behaviour details see g_input_stream_close().
1083  *
1084  * The asynchronous methods have a default fallback that uses threads to implement
1085  * asynchronicity, so they are optional for inheriting classes. However, if you
1086  * override one you must override all.
1087  **/
1088 void
g_input_stream_close_async(GInputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1089 g_input_stream_close_async (GInputStream        *stream,
1090 			    int                  io_priority,
1091 			    GCancellable        *cancellable,
1092 			    GAsyncReadyCallback  callback,
1093 			    gpointer             user_data)
1094 {
1095   GInputStreamClass *class;
1096   GError *error = NULL;
1097 
1098   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1099 
1100   if (stream->priv->closed)
1101     {
1102       GTask *task;
1103 
1104       task = g_task_new (stream, cancellable, callback, user_data);
1105       g_task_set_source_tag (task, g_input_stream_close_async);
1106       g_task_return_boolean (task, TRUE);
1107       g_object_unref (task);
1108       return;
1109     }
1110 
1111   if (!g_input_stream_set_pending (stream, &error))
1112     {
1113       g_task_report_error (stream, callback, user_data,
1114                            g_input_stream_close_async,
1115                            error);
1116       return;
1117     }
1118 
1119   class = G_INPUT_STREAM_GET_CLASS (stream);
1120   stream->priv->outstanding_callback = callback;
1121   g_object_ref (stream);
1122   class->close_async (stream, io_priority, cancellable,
1123 		      async_ready_close_callback_wrapper, user_data);
1124 }
1125 
1126 /**
1127  * g_input_stream_close_finish:
1128  * @stream: a #GInputStream.
1129  * @result: a #GAsyncResult.
1130  * @error: a #GError location to store the error occurring, or %NULL to
1131  * ignore.
1132  *
1133  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1134  *
1135  * Returns: %TRUE if the stream was closed successfully.
1136  **/
1137 gboolean
g_input_stream_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1138 g_input_stream_close_finish (GInputStream  *stream,
1139 			     GAsyncResult  *result,
1140 			     GError       **error)
1141 {
1142   GInputStreamClass *class;
1143 
1144   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1145   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1146 
1147   if (g_async_result_legacy_propagate_error (result, error))
1148     return FALSE;
1149   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1150     return g_task_propagate_boolean (G_TASK (result), error);
1151 
1152   class = G_INPUT_STREAM_GET_CLASS (stream);
1153   return class->close_finish (stream, result, error);
1154 }
1155 
1156 /**
1157  * g_input_stream_is_closed:
1158  * @stream: input stream.
1159  *
1160  * Checks if an input stream is closed.
1161  *
1162  * Returns: %TRUE if the stream is closed.
1163  **/
1164 gboolean
g_input_stream_is_closed(GInputStream * stream)1165 g_input_stream_is_closed (GInputStream *stream)
1166 {
1167   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1168 
1169   return stream->priv->closed;
1170 }
1171 
1172 /**
1173  * g_input_stream_has_pending:
1174  * @stream: input stream.
1175  *
1176  * Checks if an input stream has pending actions.
1177  *
1178  * Returns: %TRUE if @stream has pending actions.
1179  **/
1180 gboolean
g_input_stream_has_pending(GInputStream * stream)1181 g_input_stream_has_pending (GInputStream *stream)
1182 {
1183   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1184 
1185   return stream->priv->pending;
1186 }
1187 
1188 /**
1189  * g_input_stream_set_pending:
1190  * @stream: input stream
1191  * @error: a #GError location to store the error occurring, or %NULL to
1192  * ignore.
1193  *
1194  * Sets @stream to have actions pending. If the pending flag is
1195  * already set or @stream is closed, it will return %FALSE and set
1196  * @error.
1197  *
1198  * Returns: %TRUE if pending was previously unset and is now set.
1199  **/
1200 gboolean
g_input_stream_set_pending(GInputStream * stream,GError ** error)1201 g_input_stream_set_pending (GInputStream *stream, GError **error)
1202 {
1203   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1204 
1205   if (stream->priv->closed)
1206     {
1207       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1208                            _("Stream is already closed"));
1209       return FALSE;
1210     }
1211 
1212   if (stream->priv->pending)
1213     {
1214       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1215 		/* Translators: This is an error you get if there is already an
1216 		 * operation running against this stream when you try to start
1217 		 * one */
1218 		 _("Stream has outstanding operation"));
1219       return FALSE;
1220     }
1221 
1222   stream->priv->pending = TRUE;
1223   return TRUE;
1224 }
1225 
1226 /**
1227  * g_input_stream_clear_pending:
1228  * @stream: input stream
1229  *
1230  * Clears the pending flag on @stream.
1231  **/
1232 void
g_input_stream_clear_pending(GInputStream * stream)1233 g_input_stream_clear_pending (GInputStream *stream)
1234 {
1235   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1236 
1237   stream->priv->pending = FALSE;
1238 }
1239 
1240 /*< internal >
1241  * g_input_stream_async_read_is_via_threads:
1242  * @stream: input stream
1243  *
1244  * Checks if an input stream's read_async function uses threads.
1245  *
1246  * Returns: %TRUE if @stream's read_async function uses threads.
1247  **/
1248 gboolean
g_input_stream_async_read_is_via_threads(GInputStream * stream)1249 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1250 {
1251   GInputStreamClass *class;
1252 
1253   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1254 
1255   class = G_INPUT_STREAM_GET_CLASS (stream);
1256 
1257   return (class->read_async == g_input_stream_real_read_async &&
1258       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1259         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1260 }
1261 
1262 /*< internal >
1263  * g_input_stream_async_close_is_via_threads:
1264  * @stream: input stream
1265  *
1266  * Checks if an input stream's close_async function uses threads.
1267  *
1268  * Returns: %TRUE if @stream's close_async function uses threads.
1269  **/
1270 gboolean
g_input_stream_async_close_is_via_threads(GInputStream * stream)1271 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1272 {
1273   GInputStreamClass *class;
1274 
1275   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1276 
1277   class = G_INPUT_STREAM_GET_CLASS (stream);
1278 
1279   return class->close_async == g_input_stream_real_close_async;
1280 }
1281 
1282 /********************************************
1283  *   Default implementation of async ops    *
1284  ********************************************/
1285 
1286 typedef struct {
1287   void   *buffer;
1288   gsize   count;
1289 } ReadData;
1290 
1291 static void
free_read_data(ReadData * op)1292 free_read_data (ReadData *op)
1293 {
1294   g_slice_free (ReadData, op);
1295 }
1296 
1297 static void
read_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1298 read_async_thread (GTask        *task,
1299                    gpointer      source_object,
1300                    gpointer      task_data,
1301                    GCancellable *cancellable)
1302 {
1303   GInputStream *stream = source_object;
1304   ReadData *op = task_data;
1305   GInputStreamClass *class;
1306   GError *error = NULL;
1307   gssize nread;
1308 
1309   class = G_INPUT_STREAM_GET_CLASS (stream);
1310 
1311   nread = class->read_fn (stream,
1312                           op->buffer, op->count,
1313                           g_task_get_cancellable (task),
1314                           &error);
1315   if (nread == -1)
1316     g_task_return_error (task, error);
1317   else
1318     g_task_return_int (task, nread);
1319 }
1320 
1321 static void read_async_pollable (GPollableInputStream *stream,
1322                                  GTask                *task);
1323 
1324 static gboolean
read_async_pollable_ready(GPollableInputStream * stream,gpointer user_data)1325 read_async_pollable_ready (GPollableInputStream *stream,
1326 			   gpointer              user_data)
1327 {
1328   GTask *task = user_data;
1329 
1330   read_async_pollable (stream, task);
1331   return FALSE;
1332 }
1333 
1334 static void
read_async_pollable(GPollableInputStream * stream,GTask * task)1335 read_async_pollable (GPollableInputStream *stream,
1336                      GTask                *task)
1337 {
1338   ReadData *op = g_task_get_task_data (task);
1339   GError *error = NULL;
1340   gssize nread;
1341 
1342   if (g_task_return_error_if_cancelled (task))
1343     return;
1344 
1345   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1346     read_nonblocking (stream, op->buffer, op->count, &error);
1347 
1348   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1349     {
1350       GSource *source;
1351 
1352       g_error_free (error);
1353 
1354       source = g_pollable_input_stream_create_source (stream,
1355                                                       g_task_get_cancellable (task));
1356       g_task_attach_source (task, source,
1357                             (GSourceFunc) read_async_pollable_ready);
1358       g_source_unref (source);
1359       return;
1360     }
1361 
1362   if (nread == -1)
1363     g_task_return_error (task, error);
1364   else
1365     g_task_return_int (task, nread);
1366   /* g_input_stream_real_read_async() unrefs task */
1367 }
1368 
1369 
1370 static void
g_input_stream_real_read_async(GInputStream * stream,void * buffer,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1371 g_input_stream_real_read_async (GInputStream        *stream,
1372 				void                *buffer,
1373 				gsize                count,
1374 				int                  io_priority,
1375 				GCancellable        *cancellable,
1376 				GAsyncReadyCallback  callback,
1377 				gpointer             user_data)
1378 {
1379   GTask *task;
1380   ReadData *op;
1381 
1382   op = g_slice_new0 (ReadData);
1383   task = g_task_new (stream, cancellable, callback, user_data);
1384   g_task_set_source_tag (task, g_input_stream_real_read_async);
1385   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1386   g_task_set_priority (task, io_priority);
1387   op->buffer = buffer;
1388   op->count = count;
1389 
1390   if (!g_input_stream_async_read_is_via_threads (stream))
1391     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1392   else
1393     g_task_run_in_thread (task, read_async_thread);
1394   g_object_unref (task);
1395 }
1396 
1397 static gssize
g_input_stream_real_read_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1398 g_input_stream_real_read_finish (GInputStream  *stream,
1399 				 GAsyncResult  *result,
1400 				 GError       **error)
1401 {
1402   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1403 
1404   return g_task_propagate_int (G_TASK (result), error);
1405 }
1406 
1407 
1408 static void
skip_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1409 skip_async_thread (GTask        *task,
1410                    gpointer      source_object,
1411                    gpointer      task_data,
1412                    GCancellable *cancellable)
1413 {
1414   GInputStream *stream = source_object;
1415   gsize count = GPOINTER_TO_SIZE (task_data);
1416   GInputStreamClass *class;
1417   GError *error = NULL;
1418   gssize ret;
1419 
1420   class = G_INPUT_STREAM_GET_CLASS (stream);
1421   ret = class->skip (stream, count,
1422                      g_task_get_cancellable (task),
1423                      &error);
1424   if (ret == -1)
1425     g_task_return_error (task, error);
1426   else
1427     g_task_return_int (task, ret);
1428 }
1429 
1430 typedef struct {
1431   char buffer[8192];
1432   gsize count;
1433   gsize count_skipped;
1434 } SkipFallbackAsyncData;
1435 
1436 static void
skip_callback_wrapper(GObject * source_object,GAsyncResult * res,gpointer user_data)1437 skip_callback_wrapper (GObject      *source_object,
1438 		       GAsyncResult *res,
1439 		       gpointer      user_data)
1440 {
1441   GInputStreamClass *class;
1442   GTask *task = user_data;
1443   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1444   GError *error = NULL;
1445   gssize ret;
1446 
1447   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1448 
1449   if (ret > 0)
1450     {
1451       data->count -= ret;
1452       data->count_skipped += ret;
1453 
1454       if (data->count > 0)
1455 	{
1456 	  class = G_INPUT_STREAM_GET_CLASS (source_object);
1457 	  class->read_async (G_INPUT_STREAM (source_object),
1458                              data->buffer, MIN (8192, data->count),
1459                              g_task_get_priority (task),
1460                              g_task_get_cancellable (task),
1461                              skip_callback_wrapper, task);
1462 	  return;
1463 	}
1464     }
1465 
1466   if (ret == -1 &&
1467       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1468       data->count_skipped)
1469     {
1470       /* No error, return partial read */
1471       g_clear_error (&error);
1472     }
1473 
1474   if (error)
1475     g_task_return_error (task, error);
1476   else
1477     g_task_return_int (task, data->count_skipped);
1478   g_object_unref (task);
1479  }
1480 
1481 static void
g_input_stream_real_skip_async(GInputStream * stream,gsize count,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1482 g_input_stream_real_skip_async (GInputStream        *stream,
1483 				gsize                count,
1484 				int                  io_priority,
1485 				GCancellable        *cancellable,
1486 				GAsyncReadyCallback  callback,
1487 				gpointer             user_data)
1488 {
1489   GInputStreamClass *class;
1490   SkipFallbackAsyncData *data;
1491   GTask *task;
1492 
1493   class = G_INPUT_STREAM_GET_CLASS (stream);
1494 
1495   task = g_task_new (stream, cancellable, callback, user_data);
1496   g_task_set_source_tag (task, g_input_stream_real_skip_async);
1497   g_task_set_priority (task, io_priority);
1498 
1499   if (g_input_stream_async_read_is_via_threads (stream))
1500     {
1501       /* Read is thread-using async fallback.
1502        * Make skip use threads too, so that we can use a possible sync skip
1503        * implementation. */
1504       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1505 
1506       g_task_run_in_thread (task, skip_async_thread);
1507       g_object_unref (task);
1508     }
1509   else
1510     {
1511       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1512 
1513       /* There is a custom async read function, lets use that. */
1514       data = g_new (SkipFallbackAsyncData, 1);
1515       data->count = count;
1516       data->count_skipped = 0;
1517       g_task_set_task_data (task, data, g_free);
1518       g_task_set_check_cancellable (task, FALSE);
1519       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1520 			 skip_callback_wrapper, task);
1521     }
1522 
1523 }
1524 
1525 static gssize
g_input_stream_real_skip_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1526 g_input_stream_real_skip_finish (GInputStream  *stream,
1527 				 GAsyncResult  *result,
1528 				 GError       **error)
1529 {
1530   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1531 
1532   return g_task_propagate_int (G_TASK (result), error);
1533 }
1534 
1535 static void
close_async_thread(GTask * task,gpointer source_object,gpointer task_data,GCancellable * cancellable)1536 close_async_thread (GTask        *task,
1537                     gpointer      source_object,
1538                     gpointer      task_data,
1539                     GCancellable *cancellable)
1540 {
1541   GInputStream *stream = source_object;
1542   GInputStreamClass *class;
1543   GError *error = NULL;
1544   gboolean result;
1545 
1546   class = G_INPUT_STREAM_GET_CLASS (stream);
1547   if (class->close_fn)
1548     {
1549       result = class->close_fn (stream,
1550                                 g_task_get_cancellable (task),
1551                                 &error);
1552       if (!result)
1553         {
1554           g_task_return_error (task, error);
1555           return;
1556         }
1557     }
1558 
1559   g_task_return_boolean (task, TRUE);
1560 }
1561 
1562 static void
g_input_stream_real_close_async(GInputStream * stream,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)1563 g_input_stream_real_close_async (GInputStream        *stream,
1564 				 int                  io_priority,
1565 				 GCancellable        *cancellable,
1566 				 GAsyncReadyCallback  callback,
1567 				 gpointer             user_data)
1568 {
1569   GTask *task;
1570 
1571   task = g_task_new (stream, cancellable, callback, user_data);
1572   g_task_set_source_tag (task, g_input_stream_real_close_async);
1573   g_task_set_check_cancellable (task, FALSE);
1574   g_task_set_priority (task, io_priority);
1575 
1576   g_task_run_in_thread (task, close_async_thread);
1577   g_object_unref (task);
1578 }
1579 
1580 static gboolean
g_input_stream_real_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)1581 g_input_stream_real_close_finish (GInputStream  *stream,
1582 				  GAsyncResult  *result,
1583 				  GError       **error)
1584 {
1585   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1586 
1587   return g_task_propagate_boolean (G_TASK (result), error);
1588 }
1589