• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GLib testing framework examples and tests
2  * Copyright (C) 2008 Red Hat, Inc
3  *
4  * This work is provided "as is"; redistribution and modification
5  * in whole or in part, in any medium, physical or electronic is
6  * permitted without restriction.
7  *
8  * This work is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  *
12  * In no event shall the authors or contributors be liable for any
13  * direct, indirect, incidental, special, exemplary, or consequential
14  * damages (including, but not limited to, procurement of substitute
15  * goods or services; loss of use, data, or profits; or business
16  * interruption) however caused and on any theory of liability, whether
17  * in contract, strict liability, or tort (including negligence or
18  * otherwise) arising in any way out of the use of this software, even
19  * if advised of the possibility of such damage.
20  */
21 
22 #include <gio/gio.h>
23 #include <gio/gunixinputstream.h>
24 #include <gio/gunixoutputstream.h>
25 #include <glib.h>
26 #include <glib/glib-unix.h>
27 #include <signal.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <fcntl.h>
32 
33 /* sizeof(DATA) will give the number of bytes in the array, plus the terminating nul */
34 static const gchar DATA[] = "abcdefghijklmnopqrstuvwxyz";
35 
36 int writer_pipe[2], reader_pipe[2];
37 GCancellable *writer_cancel, *reader_cancel, *main_cancel;
38 GMainLoop *loop;
39 
40 
41 static gpointer
writer_thread(gpointer user_data)42 writer_thread (gpointer user_data)
43 {
44   GOutputStream *out;
45   gssize nwrote, offset;
46   GError *err = NULL;
47 
48   out = g_unix_output_stream_new (writer_pipe[1], TRUE);
49 
50   do
51     {
52       g_usleep (10);
53 
54       offset = 0;
55       while (offset < (gssize) sizeof (DATA))
56 	{
57 	  nwrote = g_output_stream_write (out, DATA + offset,
58 					  sizeof (DATA) - offset,
59 					  writer_cancel, &err);
60 	  if (nwrote <= 0 || err != NULL)
61 	    break;
62 	  offset += nwrote;
63 	}
64 
65       g_assert (nwrote > 0 || err != NULL);
66     }
67   while (err == NULL);
68 
69   if (g_cancellable_is_cancelled (writer_cancel))
70     {
71       g_clear_error (&err);
72       g_cancellable_cancel (main_cancel);
73       g_object_unref (out);
74       return NULL;
75     }
76 
77   g_warning ("writer: %s", err->message);
78   g_assert_not_reached ();
79 }
80 
81 static gpointer
reader_thread(gpointer user_data)82 reader_thread (gpointer user_data)
83 {
84   GInputStream *in;
85   gssize nread = 0, total;
86   GError *err = NULL;
87   char buf[sizeof (DATA)];
88 
89   in = g_unix_input_stream_new (reader_pipe[0], TRUE);
90 
91   do
92     {
93       total = 0;
94       while (total < (gssize) sizeof (DATA))
95 	{
96 	  nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
97 				       reader_cancel, &err);
98 	  if (nread <= 0 || err != NULL)
99 	    break;
100 	  total += nread;
101 	}
102 
103       if (err)
104 	break;
105 
106       if (nread == 0)
107 	{
108 	  g_assert (err == NULL);
109 	  /* pipe closed */
110 	  g_object_unref (in);
111 	  return NULL;
112 	}
113 
114       g_assert_cmpstr (buf, ==, DATA);
115       g_assert (!g_cancellable_is_cancelled (reader_cancel));
116     }
117   while (err == NULL);
118 
119   g_warning ("reader: %s", err->message);
120   g_assert_not_reached ();
121 }
122 
123 static char main_buf[sizeof (DATA)];
124 static gssize main_len, main_offset;
125 
126 static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
127 static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
128 static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
129 
130 static void
do_main_cancel(GOutputStream * out)131 do_main_cancel (GOutputStream *out)
132 {
133   g_output_stream_close (out, NULL, NULL);
134   g_main_loop_quit (loop);
135 }
136 
137 static void
main_thread_skipped(GObject * source,GAsyncResult * res,gpointer user_data)138 main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
139 {
140   GInputStream *in = G_INPUT_STREAM (source);
141   GOutputStream *out = user_data;
142   GError *err = NULL;
143   gssize nskipped;
144 
145   nskipped = g_input_stream_skip_finish (in, res, &err);
146 
147   if (g_cancellable_is_cancelled (main_cancel))
148     {
149       do_main_cancel (out);
150       return;
151     }
152 
153   g_assert_no_error (err);
154 
155   main_offset += nskipped;
156   if (main_offset == main_len)
157     {
158       main_offset = 0;
159       g_output_stream_write_async (out, main_buf, main_len,
160                                    G_PRIORITY_DEFAULT, main_cancel,
161                                    main_thread_wrote, in);
162     }
163   else
164     {
165       g_input_stream_skip_async (in, main_len - main_offset,
166 				 G_PRIORITY_DEFAULT, main_cancel,
167 				 main_thread_skipped, out);
168     }
169 }
170 
171 static void
main_thread_read(GObject * source,GAsyncResult * res,gpointer user_data)172 main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
173 {
174   GInputStream *in = G_INPUT_STREAM (source);
175   GOutputStream *out = user_data;
176   GError *err = NULL;
177   gssize nread;
178 
179   nread = g_input_stream_read_finish (in, res, &err);
180 
181   if (g_cancellable_is_cancelled (main_cancel))
182     {
183       do_main_cancel (out);
184       g_clear_error (&err);
185       return;
186     }
187 
188   g_assert_no_error (err);
189 
190   main_offset += nread;
191   if (main_offset == sizeof (DATA))
192     {
193       main_len = main_offset;
194       main_offset = 0;
195       /* Now skip the same amount */
196       g_input_stream_skip_async (in, main_len,
197 				 G_PRIORITY_DEFAULT, main_cancel,
198 				 main_thread_skipped, out);
199     }
200   else
201     {
202       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
203 				 G_PRIORITY_DEFAULT, main_cancel,
204 				 main_thread_read, out);
205     }
206 }
207 
208 static void
main_thread_wrote(GObject * source,GAsyncResult * res,gpointer user_data)209 main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
210 {
211   GOutputStream *out = G_OUTPUT_STREAM (source);
212   GInputStream *in = user_data;
213   GError *err = NULL;
214   gssize nwrote;
215 
216   nwrote = g_output_stream_write_finish (out, res, &err);
217 
218   if (g_cancellable_is_cancelled (main_cancel))
219     {
220       do_main_cancel (out);
221       g_clear_error (&err);
222       return;
223     }
224 
225   g_assert_no_error (err);
226   g_assert_cmpint (nwrote, <=, main_len - main_offset);
227 
228   main_offset += nwrote;
229   if (main_offset == main_len)
230     {
231       main_offset = 0;
232       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
233 				 G_PRIORITY_DEFAULT, main_cancel,
234 				 main_thread_read, out);
235     }
236   else
237     {
238       g_output_stream_write_async (out, main_buf + main_offset,
239 				   main_len - main_offset,
240 				   G_PRIORITY_DEFAULT, main_cancel,
241 				   main_thread_wrote, in);
242     }
243 }
244 
245 static gboolean
timeout(gpointer cancellable)246 timeout (gpointer cancellable)
247 {
248   g_cancellable_cancel (cancellable);
249   return FALSE;
250 }
251 
252 static void
test_pipe_io(gconstpointer nonblocking)253 test_pipe_io (gconstpointer nonblocking)
254 {
255   GThread *writer, *reader;
256   GInputStream *in;
257   GOutputStream *out;
258 
259   /* Split off two (additional) threads, a reader and a writer. From
260    * the writer thread, write data synchronously in small chunks,
261    * which gets alternately read and skipped asynchronously by the
262    * main thread and then (if not skipped) written asynchronously to
263    * the reader thread, which reads it synchronously. Eventually a
264    * timeout in the main thread will cause it to cancel the writer
265    * thread, which will in turn cancel the read op in the main thread,
266    * which will then close the pipe to the reader thread, causing the
267    * read op to fail.
268    */
269 
270   g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
271 
272   if (nonblocking)
273     {
274       GError *error = NULL;
275 
276       g_unix_set_fd_nonblocking (writer_pipe[0], TRUE, &error);
277       g_assert_no_error (error);
278       g_unix_set_fd_nonblocking (writer_pipe[1], TRUE, &error);
279       g_assert_no_error (error);
280       g_unix_set_fd_nonblocking (reader_pipe[0], TRUE, &error);
281       g_assert_no_error (error);
282       g_unix_set_fd_nonblocking (reader_pipe[1], TRUE, &error);
283       g_assert_no_error (error);
284     }
285 
286   writer_cancel = g_cancellable_new ();
287   reader_cancel = g_cancellable_new ();
288   main_cancel = g_cancellable_new ();
289 
290   writer = g_thread_new ("writer", writer_thread, NULL);
291   reader = g_thread_new ("reader", reader_thread, NULL);
292 
293   in = g_unix_input_stream_new (writer_pipe[0], TRUE);
294   out = g_unix_output_stream_new (reader_pipe[1], TRUE);
295 
296   g_input_stream_read_async (in, main_buf, sizeof (main_buf),
297 			     G_PRIORITY_DEFAULT, main_cancel,
298 			     main_thread_read, out);
299 
300   g_timeout_add (500, timeout, writer_cancel);
301 
302   loop = g_main_loop_new (NULL, TRUE);
303   g_main_loop_run (loop);
304   g_main_loop_unref (loop);
305 
306   g_thread_join (reader);
307   g_thread_join (writer);
308 
309   g_object_unref (main_cancel);
310   g_object_unref (reader_cancel);
311   g_object_unref (writer_cancel);
312   g_object_unref (in);
313   g_object_unref (out);
314 }
315 
316 static void
test_basic(void)317 test_basic (void)
318 {
319   GUnixInputStream *is;
320   GUnixOutputStream *os;
321   gint fd;
322   gboolean close_fd;
323 
324   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE));
325   g_object_get (is,
326                 "fd", &fd,
327                 "close-fd", &close_fd,
328                 NULL);
329   g_assert_cmpint (fd, ==, 0);
330   g_assert (close_fd);
331 
332   g_unix_input_stream_set_close_fd (is, FALSE);
333   g_assert (!g_unix_input_stream_get_close_fd (is));
334   g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
335 
336   g_assert (!g_input_stream_has_pending (G_INPUT_STREAM (is)));
337 
338   g_object_unref (is);
339 
340   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
341   g_object_get (os,
342                 "fd", &fd,
343                 "close-fd", &close_fd,
344                 NULL);
345   g_assert_cmpint (fd, ==, 1);
346   g_assert (close_fd);
347 
348   g_unix_output_stream_set_close_fd (os, FALSE);
349   g_assert (!g_unix_output_stream_get_close_fd (os));
350   g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
351 
352   g_assert (!g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
353 
354   g_object_unref (os);
355 }
356 
357 typedef struct {
358   GInputStream *is;
359   GOutputStream *os;
360   const guint8 *write_data;
361   guint8 *read_data;
362 } TestReadWriteData;
363 
364 static gpointer
test_read_write_write_thread(gpointer user_data)365 test_read_write_write_thread (gpointer user_data)
366 {
367   TestReadWriteData *data = user_data;
368   gsize bytes_written;
369   GError *error = NULL;
370   gboolean res;
371 
372   res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
373   g_assert_true (res);
374   g_assert_no_error (error);
375   g_assert_cmpuint (bytes_written, ==, 1024);
376 
377   return NULL;
378 }
379 
380 static gpointer
test_read_write_read_thread(gpointer user_data)381 test_read_write_read_thread (gpointer user_data)
382 {
383   TestReadWriteData *data = user_data;
384   gsize bytes_read;
385   GError *error = NULL;
386   gboolean res;
387 
388   res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
389   g_assert_true (res);
390   g_assert_no_error (error);
391   g_assert_cmpuint (bytes_read, ==, 1024);
392 
393   return NULL;
394 }
395 
396 static gpointer
test_read_write_writev_thread(gpointer user_data)397 test_read_write_writev_thread (gpointer user_data)
398 {
399   TestReadWriteData *data = user_data;
400   gsize bytes_written;
401   GError *error = NULL;
402   gboolean res;
403   GOutputVector vectors[3];
404 
405   vectors[0].buffer = data->write_data;
406   vectors[0].size = 256;
407   vectors[1].buffer = data->write_data + 256;
408   vectors[1].size = 256;
409   vectors[2].buffer = data->write_data + 512;
410   vectors[2].size = 512;
411 
412   res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
413   g_assert_true (res);
414   g_assert_no_error (error);
415   g_assert_cmpuint (bytes_written, ==, 1024);
416 
417   return NULL;
418 }
419 
420 /* test if normal writing/reading from a pipe works */
421 static void
test_read_write(gconstpointer user_data)422 test_read_write (gconstpointer user_data)
423 {
424   gboolean writev = GPOINTER_TO_INT (user_data);
425   GUnixInputStream *is;
426   GUnixOutputStream *os;
427   gint fd[2];
428   guint8 data_write[1024], data_read[1024];
429   guint i;
430   GThread *write_thread, *read_thread;
431   TestReadWriteData data;
432 
433   for (i = 0; i < sizeof (data_write); i++)
434     data_write[i] = i;
435 
436   g_assert_cmpint (pipe (fd), ==, 0);
437 
438   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
439   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
440 
441   data.is = G_INPUT_STREAM (is);
442   data.os = G_OUTPUT_STREAM (os);
443   data.read_data = data_read;
444   data.write_data = data_write;
445 
446   if (writev)
447     write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
448   else
449     write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
450   read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
451 
452   g_thread_join (write_thread);
453   g_thread_join (read_thread);
454 
455   g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
456 
457   g_object_unref (os);
458   g_object_unref (is);
459 }
460 
461 /* test if g_pollable_output_stream_write_nonblocking() and
462  * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
463  * and correctly reset their status afterwards again, and all data that is
464  * written can also be read again.
465  */
466 static void
test_write_wouldblock(void)467 test_write_wouldblock (void)
468 {
469 #ifndef F_GETPIPE_SZ
470   g_test_skip ("F_GETPIPE_SZ not defined");
471 #else  /* if F_GETPIPE_SZ */
472   GUnixInputStream *is;
473   GUnixOutputStream *os;
474   gint fd[2];
475   GError *err = NULL;
476   guint8 data_write[1024], data_read[1024];
477   guint i;
478   gint pipe_capacity;
479 
480   for (i = 0; i < sizeof (data_write); i++)
481     data_write[i] = i;
482 
483   g_assert_cmpint (pipe (fd), ==, 0);
484 
485   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
486   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
487   g_assert_cmpint (pipe_capacity, >=, 4096);
488   g_assert_cmpint (pipe_capacity % 1024, >=, 0);
489 
490   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
491   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
492 
493   /* Run the whole thing three times to make sure that the streams
494    * reset the writability/readability state again */
495   for (i = 0; i < 3; i++) {
496     gssize written = 0, written_complete = 0;
497     gssize read = 0, read_complete = 0;
498 
499     do
500       {
501         written_complete += written;
502         written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
503                                                               data_write,
504                                                               sizeof (data_write),
505                                                               NULL,
506                                                               &err);
507       }
508     while (written > 0);
509 
510     g_assert_cmpuint (written_complete, >, 0);
511     g_assert_nonnull (err);
512     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
513     g_clear_error (&err);
514 
515     do
516       {
517         read_complete += read;
518         read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
519                                                          data_read,
520                                                          sizeof (data_read),
521                                                          NULL,
522                                                          &err);
523         if (read > 0)
524           g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
525       }
526     while (read > 0);
527 
528     g_assert_cmpuint (read_complete, ==, written_complete);
529     g_assert_nonnull (err);
530     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
531     g_clear_error (&err);
532   }
533 
534   g_object_unref (os);
535   g_object_unref (is);
536 #endif  /* if F_GETPIPE_SZ */
537 }
538 
539 /* test if g_pollable_output_stream_writev_nonblocking() and
540  * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
541  * and correctly reset their status afterwards again, and all data that is
542  * written can also be read again.
543  */
544 static void
test_writev_wouldblock(void)545 test_writev_wouldblock (void)
546 {
547 #ifndef F_GETPIPE_SZ
548   g_test_skip ("F_GETPIPE_SZ not defined");
549 #else  /* if F_GETPIPE_SZ */
550   GUnixInputStream *is;
551   GUnixOutputStream *os;
552   gint fd[2];
553   GError *err = NULL;
554   guint8 data_write[1024], data_read[1024];
555   guint i;
556   GOutputVector vectors[4];
557   GPollableReturn res;
558   gint pipe_capacity;
559 
560   for (i = 0; i < sizeof (data_write); i++)
561     data_write[i] = i;
562 
563   g_assert_cmpint (pipe (fd), ==, 0);
564 
565   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
566   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
567   g_assert_cmpint (pipe_capacity, >=, 4096);
568   g_assert_cmpint (pipe_capacity % 1024, >=, 0);
569 
570   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
571   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
572 
573   /* Run the whole thing three times to make sure that the streams
574    * reset the writability/readability state again */
575   for (i = 0; i < 3; i++) {
576     gsize written = 0, written_complete = 0;
577     gssize read = 0, read_complete = 0;
578 
579     do
580     {
581         written_complete += written;
582 
583         vectors[0].buffer = data_write;
584         vectors[0].size = 256;
585         vectors[1].buffer = data_write + 256;
586         vectors[1].size = 256;
587         vectors[2].buffer = data_write + 512;
588         vectors[2].size = 256;
589         vectors[3].buffer = data_write + 768;
590         vectors[3].size = 256;
591 
592         res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
593                                                            vectors,
594                                                            G_N_ELEMENTS (vectors),
595                                                            &written,
596                                                            NULL,
597                                                            &err);
598       }
599     while (res == G_POLLABLE_RETURN_OK);
600 
601     g_assert_cmpuint (written_complete, >, 0);
602     g_assert_null (err);
603     g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
604     /* writev() on UNIX streams either succeeds fully or not at all */
605     g_assert_cmpuint (written, ==, 0);
606 
607     do
608       {
609         read_complete += read;
610         read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
611                                                          data_read,
612                                                          sizeof (data_read),
613                                                          NULL,
614                                                          &err);
615         if (read > 0)
616           g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
617       }
618     while (read > 0);
619 
620     g_assert_cmpuint (read_complete, ==, written_complete);
621     g_assert_nonnull (err);
622     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
623     g_clear_error (&err);
624   }
625 
626   g_object_unref (os);
627   g_object_unref (is);
628 #endif  /* if F_GETPIPE_SZ */
629 }
630 
631 #ifdef F_GETPIPE_SZ
632 static void
write_async_wouldblock_cb(GUnixOutputStream * os,GAsyncResult * result,gpointer user_data)633 write_async_wouldblock_cb (GUnixOutputStream *os,
634                            GAsyncResult      *result,
635                            gpointer           user_data)
636 {
637   gsize *bytes_written = user_data;
638   GError *err = NULL;
639 
640   g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
641   g_assert_no_error (err);
642 }
643 
644 static void
read_async_wouldblock_cb(GUnixInputStream * is,GAsyncResult * result,gpointer user_data)645 read_async_wouldblock_cb (GUnixInputStream  *is,
646                           GAsyncResult      *result,
647                           gpointer           user_data)
648 {
649   gsize *bytes_read = user_data;
650   GError *err = NULL;
651 
652   g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, &err);
653   g_assert_no_error (err);
654 }
655 #endif  /* if F_GETPIPE_SZ */
656 
657 /* test if the async implementation of write_all() and read_all() in G*Stream
658  * around the GPollable*Stream API is working correctly.
659  */
660 static void
test_write_async_wouldblock(void)661 test_write_async_wouldblock (void)
662 {
663 #ifndef F_GETPIPE_SZ
664   g_test_skip ("F_GETPIPE_SZ not defined");
665 #else  /* if F_GETPIPE_SZ */
666   GUnixInputStream *is;
667   GUnixOutputStream *os;
668   gint fd[2];
669   guint8 *data, *data_read;
670   guint i;
671   gint pipe_capacity;
672   gsize bytes_written = 0, bytes_read = 0;
673 
674   g_assert_cmpint (pipe (fd), ==, 0);
675 
676   /* FIXME: These should not be needed but otherwise
677    * g_unix_output_stream_write() will block because
678    *   a) the fd is writable
679    *   b) writing 4x capacity will block because writes are atomic
680    *   c) the fd is blocking
681    *
682    * See https://gitlab.gnome.org/GNOME/glib/issues/1654
683    */
684   g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
685   g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
686 
687   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
688   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
689   g_assert_cmpint (pipe_capacity, >=, 4096);
690 
691   data = g_new (guint8, 4 * pipe_capacity);
692   for (i = 0; i < 4 * pipe_capacity; i++)
693     data[i] = i;
694   data_read = g_new (guint8, 4 * pipe_capacity);
695 
696   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
697   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
698 
699   g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
700                                    data,
701                                    4 * pipe_capacity,
702                                    G_PRIORITY_DEFAULT,
703                                    NULL,
704                                    (GAsyncReadyCallback) write_async_wouldblock_cb,
705                                    &bytes_written);
706 
707   g_input_stream_read_all_async (G_INPUT_STREAM (is),
708                                  data_read,
709                                  4 * pipe_capacity,
710                                  G_PRIORITY_DEFAULT,
711                                  NULL,
712                                  (GAsyncReadyCallback) read_async_wouldblock_cb,
713                                  &bytes_read);
714 
715   while (bytes_written == 0 && bytes_read == 0)
716     g_main_context_iteration (NULL, TRUE);
717 
718   g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
719   g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
720   g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
721 
722   g_free (data);
723   g_free (data_read);
724 
725   g_object_unref (os);
726   g_object_unref (is);
727 #endif  /* if F_GETPIPE_SZ */
728 }
729 
730 #ifdef F_GETPIPE_SZ
731 static void
writev_async_wouldblock_cb(GUnixOutputStream * os,GAsyncResult * result,gpointer user_data)732 writev_async_wouldblock_cb (GUnixOutputStream *os,
733                             GAsyncResult      *result,
734                             gpointer           user_data)
735 {
736   gsize *bytes_written = user_data;
737   GError *err = NULL;
738 
739   g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
740   g_assert_no_error (err);
741 }
742 #endif  /* if F_GETPIPE_SZ */
743 
744 /* test if the async implementation of writev_all() and read_all() in G*Stream
745  * around the GPollable*Stream API is working correctly.
746  */
747 static void
test_writev_async_wouldblock(void)748 test_writev_async_wouldblock (void)
749 {
750 #ifndef F_GETPIPE_SZ
751   g_test_skip ("F_GETPIPE_SZ not defined");
752 #else  /* if F_GETPIPE_SZ */
753   GUnixInputStream *is;
754   GUnixOutputStream *os;
755   gint fd[2];
756   guint8 *data, *data_read;
757   guint i;
758   gint pipe_capacity;
759   gsize bytes_written = 0, bytes_read = 0;
760   GOutputVector vectors[4];
761 
762   g_assert_cmpint (pipe (fd), ==, 0);
763 
764   /* FIXME: These should not be needed but otherwise
765    * g_unix_output_stream_writev() will block because
766    *   a) the fd is writable
767    *   b) writing 4x capacity will block because writes are atomic
768    *   c) the fd is blocking
769    *
770    * See https://gitlab.gnome.org/GNOME/glib/issues/1654
771    */
772   g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
773   g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
774 
775   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
776   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
777   g_assert_cmpint (pipe_capacity, >=, 4096);
778 
779   data = g_new (guint8, 4 * pipe_capacity);
780   for (i = 0; i < 4 * pipe_capacity; i++)
781     data[i] = i;
782   data_read = g_new (guint8, 4 * pipe_capacity);
783 
784   vectors[0].buffer = data;
785   vectors[0].size = 1024;
786   vectors[1].buffer = data + 1024;
787   vectors[1].size = 1024;
788   vectors[2].buffer = data + 2048;
789   vectors[2].size = 1024;
790   vectors[3].buffer = data + 3072;
791   vectors[3].size = 4 * pipe_capacity - 3072;
792 
793   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
794   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
795 
796   g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
797                                     vectors,
798                                     G_N_ELEMENTS (vectors),
799                                     G_PRIORITY_DEFAULT,
800                                     NULL,
801                                     (GAsyncReadyCallback) writev_async_wouldblock_cb,
802                                     &bytes_written);
803 
804   g_input_stream_read_all_async (G_INPUT_STREAM (is),
805                                  data_read,
806                                  4 * pipe_capacity,
807                                  G_PRIORITY_DEFAULT,
808                                  NULL,
809                                  (GAsyncReadyCallback) read_async_wouldblock_cb,
810                                  &bytes_read);
811 
812   while (bytes_written == 0 && bytes_read == 0)
813     g_main_context_iteration (NULL, TRUE);
814 
815   g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
816   g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
817   g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
818 
819   g_free (data);
820   g_free (data_read);
821 
822   g_object_unref (os);
823   g_object_unref (is);
824 #endif  /* F_GETPIPE_SZ */
825 }
826 
827 int
main(int argc,char * argv[])828 main (int   argc,
829       char *argv[])
830 {
831   g_test_init (&argc, &argv, NULL);
832 
833   g_test_add_func ("/unix-streams/basic", test_basic);
834   g_test_add_data_func ("/unix-streams/pipe-io-test",
835 			GINT_TO_POINTER (FALSE),
836 			test_pipe_io);
837   g_test_add_data_func ("/unix-streams/nonblocking-io-test",
838 			GINT_TO_POINTER (TRUE),
839 			test_pipe_io);
840 
841   g_test_add_data_func ("/unix-streams/read_write",
842                         GINT_TO_POINTER (FALSE),
843                         test_read_write);
844 
845   g_test_add_data_func ("/unix-streams/read_writev",
846                         GINT_TO_POINTER (TRUE),
847                         test_read_write);
848 
849   g_test_add_func ("/unix-streams/write-wouldblock",
850 		   test_write_wouldblock);
851   g_test_add_func ("/unix-streams/writev-wouldblock",
852 		   test_writev_wouldblock);
853 
854   g_test_add_func ("/unix-streams/write-async-wouldblock",
855 		   test_write_async_wouldblock);
856   g_test_add_func ("/unix-streams/writev-async-wouldblock",
857 		   test_writev_async_wouldblock);
858 
859   return g_test_run();
860 }
861