• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GLib testing framework examples and tests
2  * Copyright (C) 2008 Red Hat, Inc
3  *
4  * This work is provided "as is"; redistribution and modification
5  * in whole or in part, in any medium, physical or electronic is
6  * permitted without restriction.
7  *
8  * This work is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11  *
12  * In no event shall the authors or contributors be liable for any
13  * direct, indirect, incidental, special, exemplary, or consequential
14  * damages (including, but not limited to, procurement of substitute
15  * goods or services; loss of use, data, or profits; or business
16  * interruption) however caused and on any theory of liability, whether
17  * in contract, strict liability, or tort (including negligence or
18  * otherwise) arising in any way out of the use of this software, even
19  * if advised of the possibility of such damage.
20  */
21 
22 #include <gio/gio.h>
23 #include <gio/gunixinputstream.h>
24 #include <gio/gunixoutputstream.h>
25 #include <glib/glib-unix.h>
26 #include <signal.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <fcntl.h>
31 
32 #define DATA "abcdefghijklmnopqrstuvwxyz"
33 
34 int writer_pipe[2], reader_pipe[2];
35 GCancellable *writer_cancel, *reader_cancel, *main_cancel;
36 GMainLoop *loop;
37 
38 
39 static gpointer
writer_thread(gpointer user_data)40 writer_thread (gpointer user_data)
41 {
42   GOutputStream *out;
43   gssize nwrote, offset;
44   GError *err = NULL;
45 
46   out = g_unix_output_stream_new (writer_pipe[1], TRUE);
47 
48   do
49     {
50       g_usleep (10);
51 
52       offset = 0;
53       while (offset < (gssize) sizeof (DATA))
54 	{
55 	  nwrote = g_output_stream_write (out, DATA + offset,
56 					  sizeof (DATA) - offset,
57 					  writer_cancel, &err);
58 	  if (nwrote <= 0 || err != NULL)
59 	    break;
60 	  offset += nwrote;
61 	}
62 
63       g_assert (nwrote > 0 || err != NULL);
64     }
65   while (err == NULL);
66 
67   if (g_cancellable_is_cancelled (writer_cancel))
68     {
69       g_clear_error (&err);
70       g_cancellable_cancel (main_cancel);
71       g_object_unref (out);
72       return NULL;
73     }
74 
75   g_warning ("writer: %s", err->message);
76   g_assert_not_reached ();
77 }
78 
79 static gpointer
reader_thread(gpointer user_data)80 reader_thread (gpointer user_data)
81 {
82   GInputStream *in;
83   gssize nread = 0, total;
84   GError *err = NULL;
85   char buf[sizeof (DATA)];
86 
87   in = g_unix_input_stream_new (reader_pipe[0], TRUE);
88 
89   do
90     {
91       total = 0;
92       while (total < (gssize) sizeof (DATA))
93 	{
94 	  nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
95 				       reader_cancel, &err);
96 	  if (nread <= 0 || err != NULL)
97 	    break;
98 	  total += nread;
99 	}
100 
101       if (err)
102 	break;
103 
104       if (nread == 0)
105 	{
106 	  g_assert (err == NULL);
107 	  /* pipe closed */
108 	  g_object_unref (in);
109 	  return NULL;
110 	}
111 
112       g_assert_cmpstr (buf, ==, DATA);
113       g_assert (!g_cancellable_is_cancelled (reader_cancel));
114     }
115   while (err == NULL);
116 
117   g_warning ("reader: %s", err->message);
118   g_assert_not_reached ();
119 }
120 
121 char main_buf[sizeof (DATA)];
122 gssize main_len, main_offset;
123 
124 static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
125 static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
126 static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
127 
128 static void
do_main_cancel(GOutputStream * out)129 do_main_cancel (GOutputStream *out)
130 {
131   g_output_stream_close (out, NULL, NULL);
132   g_main_loop_quit (loop);
133 }
134 
135 static void
main_thread_skipped(GObject * source,GAsyncResult * res,gpointer user_data)136 main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
137 {
138   GInputStream *in = G_INPUT_STREAM (source);
139   GOutputStream *out = user_data;
140   GError *err = NULL;
141   gssize nskipped;
142 
143   nskipped = g_input_stream_skip_finish (in, res, &err);
144 
145   if (g_cancellable_is_cancelled (main_cancel))
146     {
147       do_main_cancel (out);
148       return;
149     }
150 
151   g_assert_no_error (err);
152 
153   main_offset += nskipped;
154   if (main_offset == main_len)
155     {
156       main_offset = 0;
157       g_output_stream_write_async (out, main_buf, main_len,
158                                    G_PRIORITY_DEFAULT, main_cancel,
159                                    main_thread_wrote, in);
160     }
161   else
162     {
163       g_input_stream_skip_async (in, main_len - main_offset,
164 				 G_PRIORITY_DEFAULT, main_cancel,
165 				 main_thread_skipped, out);
166     }
167 }
168 
169 static void
main_thread_read(GObject * source,GAsyncResult * res,gpointer user_data)170 main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
171 {
172   GInputStream *in = G_INPUT_STREAM (source);
173   GOutputStream *out = user_data;
174   GError *err = NULL;
175   gssize nread;
176 
177   nread = g_input_stream_read_finish (in, res, &err);
178 
179   if (g_cancellable_is_cancelled (main_cancel))
180     {
181       do_main_cancel (out);
182       g_clear_error (&err);
183       return;
184     }
185 
186   g_assert_no_error (err);
187 
188   main_offset += nread;
189   if (main_offset == sizeof (DATA))
190     {
191       main_len = main_offset;
192       main_offset = 0;
193       /* Now skip the same amount */
194       g_input_stream_skip_async (in, main_len,
195 				 G_PRIORITY_DEFAULT, main_cancel,
196 				 main_thread_skipped, out);
197     }
198   else
199     {
200       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
201 				 G_PRIORITY_DEFAULT, main_cancel,
202 				 main_thread_read, out);
203     }
204 }
205 
206 static void
main_thread_wrote(GObject * source,GAsyncResult * res,gpointer user_data)207 main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
208 {
209   GOutputStream *out = G_OUTPUT_STREAM (source);
210   GInputStream *in = user_data;
211   GError *err = NULL;
212   gssize nwrote;
213 
214   nwrote = g_output_stream_write_finish (out, res, &err);
215 
216   if (g_cancellable_is_cancelled (main_cancel))
217     {
218       do_main_cancel (out);
219       g_clear_error (&err);
220       return;
221     }
222 
223   g_assert_no_error (err);
224   g_assert_cmpint (nwrote, <=, main_len - main_offset);
225 
226   main_offset += nwrote;
227   if (main_offset == main_len)
228     {
229       main_offset = 0;
230       g_input_stream_read_async (in, main_buf, sizeof (main_buf),
231 				 G_PRIORITY_DEFAULT, main_cancel,
232 				 main_thread_read, out);
233     }
234   else
235     {
236       g_output_stream_write_async (out, main_buf + main_offset,
237 				   main_len - main_offset,
238 				   G_PRIORITY_DEFAULT, main_cancel,
239 				   main_thread_wrote, in);
240     }
241 }
242 
243 static gboolean
timeout(gpointer cancellable)244 timeout (gpointer cancellable)
245 {
246   g_cancellable_cancel (cancellable);
247   return FALSE;
248 }
249 
250 static void
test_pipe_io(gconstpointer nonblocking)251 test_pipe_io (gconstpointer nonblocking)
252 {
253   GThread *writer, *reader;
254   GInputStream *in;
255   GOutputStream *out;
256 
257   /* Split off two (additional) threads, a reader and a writer. From
258    * the writer thread, write data synchronously in small chunks,
259    * which gets alternately read and skipped asynchronously by the
260    * main thread and then (if not skipped) written asynchronously to
261    * the reader thread, which reads it synchronously. Eventually a
262    * timeout in the main thread will cause it to cancel the writer
263    * thread, which will in turn cancel the read op in the main thread,
264    * which will then close the pipe to the reader thread, causing the
265    * read op to fail.
266    */
267 
268   g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
269 
270   if (nonblocking)
271     {
272       GError *error = NULL;
273 
274       g_unix_set_fd_nonblocking (writer_pipe[0], TRUE, &error);
275       g_assert_no_error (error);
276       g_unix_set_fd_nonblocking (writer_pipe[1], TRUE, &error);
277       g_assert_no_error (error);
278       g_unix_set_fd_nonblocking (reader_pipe[0], TRUE, &error);
279       g_assert_no_error (error);
280       g_unix_set_fd_nonblocking (reader_pipe[1], TRUE, &error);
281       g_assert_no_error (error);
282     }
283 
284   writer_cancel = g_cancellable_new ();
285   reader_cancel = g_cancellable_new ();
286   main_cancel = g_cancellable_new ();
287 
288   writer = g_thread_new ("writer", writer_thread, NULL);
289   reader = g_thread_new ("reader", reader_thread, NULL);
290 
291   in = g_unix_input_stream_new (writer_pipe[0], TRUE);
292   out = g_unix_output_stream_new (reader_pipe[1], TRUE);
293 
294   g_input_stream_read_async (in, main_buf, sizeof (main_buf),
295 			     G_PRIORITY_DEFAULT, main_cancel,
296 			     main_thread_read, out);
297 
298   g_timeout_add (500, timeout, writer_cancel);
299 
300   loop = g_main_loop_new (NULL, TRUE);
301   g_main_loop_run (loop);
302   g_main_loop_unref (loop);
303 
304   g_thread_join (reader);
305   g_thread_join (writer);
306 
307   g_object_unref (main_cancel);
308   g_object_unref (reader_cancel);
309   g_object_unref (writer_cancel);
310   g_object_unref (in);
311   g_object_unref (out);
312 }
313 
314 static void
test_basic(void)315 test_basic (void)
316 {
317   GUnixInputStream *is;
318   GUnixOutputStream *os;
319   gint fd;
320   gboolean close_fd;
321 
322   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE));
323   g_object_get (is,
324                 "fd", &fd,
325                 "close-fd", &close_fd,
326                 NULL);
327   g_assert_cmpint (fd, ==, 0);
328   g_assert (close_fd);
329 
330   g_unix_input_stream_set_close_fd (is, FALSE);
331   g_assert (!g_unix_input_stream_get_close_fd (is));
332   g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
333 
334   g_assert (!g_input_stream_has_pending (G_INPUT_STREAM (is)));
335 
336   g_object_unref (is);
337 
338   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
339   g_object_get (os,
340                 "fd", &fd,
341                 "close-fd", &close_fd,
342                 NULL);
343   g_assert_cmpint (fd, ==, 1);
344   g_assert (close_fd);
345 
346   g_unix_output_stream_set_close_fd (os, FALSE);
347   g_assert (!g_unix_output_stream_get_close_fd (os));
348   g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
349 
350   g_assert (!g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
351 
352   g_object_unref (os);
353 }
354 
355 typedef struct {
356   GInputStream *is;
357   GOutputStream *os;
358   const guint8 *write_data;
359   guint8 *read_data;
360 } TestReadWriteData;
361 
362 static gpointer
test_read_write_write_thread(gpointer user_data)363 test_read_write_write_thread (gpointer user_data)
364 {
365   TestReadWriteData *data = user_data;
366   gsize bytes_written;
367   GError *error = NULL;
368   gboolean res;
369 
370   res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
371   g_assert_true (res);
372   g_assert_no_error (error);
373   g_assert_cmpuint (bytes_written, ==, 1024);
374 
375   return NULL;
376 }
377 
378 static gpointer
test_read_write_read_thread(gpointer user_data)379 test_read_write_read_thread (gpointer user_data)
380 {
381   TestReadWriteData *data = user_data;
382   gsize bytes_read;
383   GError *error = NULL;
384   gboolean res;
385 
386   res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
387   g_assert_true (res);
388   g_assert_no_error (error);
389   g_assert_cmpuint (bytes_read, ==, 1024);
390 
391   return NULL;
392 }
393 
394 static gpointer
test_read_write_writev_thread(gpointer user_data)395 test_read_write_writev_thread (gpointer user_data)
396 {
397   TestReadWriteData *data = user_data;
398   gsize bytes_written;
399   GError *error = NULL;
400   gboolean res;
401   GOutputVector vectors[3];
402 
403   vectors[0].buffer = data->write_data;
404   vectors[0].size = 256;
405   vectors[1].buffer = data->write_data + 256;
406   vectors[1].size = 256;
407   vectors[2].buffer = data->write_data + 512;
408   vectors[2].size = 512;
409 
410   res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
411   g_assert_true (res);
412   g_assert_no_error (error);
413   g_assert_cmpuint (bytes_written, ==, 1024);
414 
415   return NULL;
416 }
417 
418 /* test if normal writing/reading from a pipe works */
419 static void
test_read_write(gconstpointer user_data)420 test_read_write (gconstpointer user_data)
421 {
422   gboolean writev = GPOINTER_TO_INT (user_data);
423   GUnixInputStream *is;
424   GUnixOutputStream *os;
425   gint fd[2];
426   guint8 data_write[1024], data_read[1024];
427   guint i;
428   GThread *write_thread, *read_thread;
429   TestReadWriteData data;
430 
431   for (i = 0; i < sizeof (data_write); i++)
432     data_write[i] = i;
433 
434   g_assert_cmpint (pipe (fd), ==, 0);
435 
436   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
437   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
438 
439   data.is = G_INPUT_STREAM (is);
440   data.os = G_OUTPUT_STREAM (os);
441   data.read_data = data_read;
442   data.write_data = data_write;
443 
444   if (writev)
445     write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
446   else
447     write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
448   read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
449 
450   g_thread_join (write_thread);
451   g_thread_join (read_thread);
452 
453   g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
454 
455   g_object_unref (os);
456   g_object_unref (is);
457 }
458 
459 /* test if g_pollable_output_stream_write_nonblocking() and
460  * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
461  * and correctly reset their status afterwards again, and all data that is
462  * written can also be read again.
463  */
464 static void
test_write_wouldblock(void)465 test_write_wouldblock (void)
466 {
467 #ifndef F_GETPIPE_SZ
468   g_test_skip ("F_GETPIPE_SZ not defined");
469 #else  /* if F_GETPIPE_SZ */
470   GUnixInputStream *is;
471   GUnixOutputStream *os;
472   gint fd[2];
473   GError *err = NULL;
474   guint8 data_write[1024], data_read[1024];
475   guint i;
476   gint pipe_capacity;
477 
478   for (i = 0; i < sizeof (data_write); i++)
479     data_write[i] = i;
480 
481   g_assert_cmpint (pipe (fd), ==, 0);
482 
483   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
484   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
485   g_assert_cmpint (pipe_capacity, >=, 4096);
486   g_assert_cmpint (pipe_capacity % 1024, >=, 0);
487 
488   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
489   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
490 
491   /* Run the whole thing three times to make sure that the streams
492    * reset the writability/readability state again */
493   for (i = 0; i < 3; i++) {
494     gssize written = 0, written_complete = 0;
495     gssize read = 0, read_complete = 0;
496 
497     do
498       {
499         written_complete += written;
500         written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
501                                                               data_write,
502                                                               sizeof (data_write),
503                                                               NULL,
504                                                               &err);
505       }
506     while (written > 0);
507 
508     g_assert_cmpuint (written_complete, >, 0);
509     g_assert_nonnull (err);
510     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
511     g_clear_error (&err);
512 
513     do
514       {
515         read_complete += read;
516         read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
517                                                          data_read,
518                                                          sizeof (data_read),
519                                                          NULL,
520                                                          &err);
521         if (read > 0)
522           g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
523       }
524     while (read > 0);
525 
526     g_assert_cmpuint (read_complete, ==, written_complete);
527     g_assert_nonnull (err);
528     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
529     g_clear_error (&err);
530   }
531 
532   g_object_unref (os);
533   g_object_unref (is);
534 #endif  /* if F_GETPIPE_SZ */
535 }
536 
537 /* test if g_pollable_output_stream_writev_nonblocking() and
538  * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
539  * and correctly reset their status afterwards again, and all data that is
540  * written can also be read again.
541  */
542 static void
test_writev_wouldblock(void)543 test_writev_wouldblock (void)
544 {
545 #ifndef F_GETPIPE_SZ
546   g_test_skip ("F_GETPIPE_SZ not defined");
547 #else  /* if F_GETPIPE_SZ */
548   GUnixInputStream *is;
549   GUnixOutputStream *os;
550   gint fd[2];
551   GError *err = NULL;
552   guint8 data_write[1024], data_read[1024];
553   guint i;
554   GOutputVector vectors[4];
555   GPollableReturn res;
556   gint pipe_capacity;
557 
558   for (i = 0; i < sizeof (data_write); i++)
559     data_write[i] = i;
560 
561   g_assert_cmpint (pipe (fd), ==, 0);
562 
563   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
564   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
565   g_assert_cmpint (pipe_capacity, >=, 4096);
566   g_assert_cmpint (pipe_capacity % 1024, >=, 0);
567 
568   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
569   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
570 
571   /* Run the whole thing three times to make sure that the streams
572    * reset the writability/readability state again */
573   for (i = 0; i < 3; i++) {
574     gsize written = 0, written_complete = 0;
575     gssize read = 0, read_complete = 0;
576 
577     do
578     {
579         written_complete += written;
580 
581         vectors[0].buffer = data_write;
582         vectors[0].size = 256;
583         vectors[1].buffer = data_write + 256;
584         vectors[1].size = 256;
585         vectors[2].buffer = data_write + 512;
586         vectors[2].size = 256;
587         vectors[3].buffer = data_write + 768;
588         vectors[3].size = 256;
589 
590         res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
591                                                            vectors,
592                                                            G_N_ELEMENTS (vectors),
593                                                            &written,
594                                                            NULL,
595                                                            &err);
596       }
597     while (res == G_POLLABLE_RETURN_OK);
598 
599     g_assert_cmpuint (written_complete, >, 0);
600     g_assert_null (err);
601     g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
602     /* writev() on UNIX streams either succeeds fully or not at all */
603     g_assert_cmpuint (written, ==, 0);
604 
605     do
606       {
607         read_complete += read;
608         read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
609                                                          data_read,
610                                                          sizeof (data_read),
611                                                          NULL,
612                                                          &err);
613         if (read > 0)
614           g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
615       }
616     while (read > 0);
617 
618     g_assert_cmpuint (read_complete, ==, written_complete);
619     g_assert_nonnull (err);
620     g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
621     g_clear_error (&err);
622   }
623 
624   g_object_unref (os);
625   g_object_unref (is);
626 #endif  /* if F_GETPIPE_SZ */
627 }
628 
629 #ifdef F_GETPIPE_SZ
630 static void
write_async_wouldblock_cb(GUnixOutputStream * os,GAsyncResult * result,gpointer user_data)631 write_async_wouldblock_cb (GUnixOutputStream *os,
632                            GAsyncResult      *result,
633                            gpointer           user_data)
634 {
635   gsize *bytes_written = user_data;
636   GError *err = NULL;
637 
638   g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
639   g_assert_no_error (err);
640 }
641 
642 static void
read_async_wouldblock_cb(GUnixInputStream * is,GAsyncResult * result,gpointer user_data)643 read_async_wouldblock_cb (GUnixInputStream  *is,
644                           GAsyncResult      *result,
645                           gpointer           user_data)
646 {
647   gsize *bytes_read = user_data;
648   GError *err = NULL;
649 
650   g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, &err);
651   g_assert_no_error (err);
652 }
653 #endif  /* if F_GETPIPE_SZ */
654 
655 /* test if the async implementation of write_all() and read_all() in G*Stream
656  * around the GPollable*Stream API is working correctly.
657  */
658 static void
test_write_async_wouldblock(void)659 test_write_async_wouldblock (void)
660 {
661 #ifndef F_GETPIPE_SZ
662   g_test_skip ("F_GETPIPE_SZ not defined");
663 #else  /* if F_GETPIPE_SZ */
664   GUnixInputStream *is;
665   GUnixOutputStream *os;
666   gint fd[2];
667   guint8 *data, *data_read;
668   guint i;
669   gint pipe_capacity;
670   gsize bytes_written = 0, bytes_read = 0;
671 
672   g_assert_cmpint (pipe (fd), ==, 0);
673 
674   /* FIXME: These should not be needed but otherwise
675    * g_unix_output_stream_write() will block because
676    *   a) the fd is writable
677    *   b) writing 4x capacity will block because writes are atomic
678    *   c) the fd is blocking
679    *
680    * See https://gitlab.gnome.org/GNOME/glib/issues/1654
681    */
682   g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
683   g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
684 
685   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
686   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
687   g_assert_cmpint (pipe_capacity, >=, 4096);
688 
689   data = g_new (guint8, 4 * pipe_capacity);
690   for (i = 0; i < 4 * pipe_capacity; i++)
691     data[i] = i;
692   data_read = g_new (guint8, 4 * pipe_capacity);
693 
694   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
695   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
696 
697   g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
698                                    data,
699                                    4 * pipe_capacity,
700                                    G_PRIORITY_DEFAULT,
701                                    NULL,
702                                    (GAsyncReadyCallback) write_async_wouldblock_cb,
703                                    &bytes_written);
704 
705   g_input_stream_read_all_async (G_INPUT_STREAM (is),
706                                  data_read,
707                                  4 * pipe_capacity,
708                                  G_PRIORITY_DEFAULT,
709                                  NULL,
710                                  (GAsyncReadyCallback) read_async_wouldblock_cb,
711                                  &bytes_read);
712 
713   while (bytes_written == 0 && bytes_read == 0)
714     g_main_context_iteration (NULL, TRUE);
715 
716   g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
717   g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
718   g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
719 
720   g_free (data);
721   g_free (data_read);
722 
723   g_object_unref (os);
724   g_object_unref (is);
725 #endif  /* if F_GETPIPE_SZ */
726 }
727 
728 #ifdef F_GETPIPE_SZ
729 static void
writev_async_wouldblock_cb(GUnixOutputStream * os,GAsyncResult * result,gpointer user_data)730 writev_async_wouldblock_cb (GUnixOutputStream *os,
731                             GAsyncResult      *result,
732                             gpointer           user_data)
733 {
734   gsize *bytes_written = user_data;
735   GError *err = NULL;
736 
737   g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
738   g_assert_no_error (err);
739 }
740 #endif  /* if F_GETPIPE_SZ */
741 
742 /* test if the async implementation of writev_all() and read_all() in G*Stream
743  * around the GPollable*Stream API is working correctly.
744  */
745 static void
test_writev_async_wouldblock(void)746 test_writev_async_wouldblock (void)
747 {
748 #ifndef F_GETPIPE_SZ
749   g_test_skip ("F_GETPIPE_SZ not defined");
750 #else  /* if F_GETPIPE_SZ */
751   GUnixInputStream *is;
752   GUnixOutputStream *os;
753   gint fd[2];
754   guint8 *data, *data_read;
755   guint i;
756   gint pipe_capacity;
757   gsize bytes_written = 0, bytes_read = 0;
758   GOutputVector vectors[4];
759 
760   g_assert_cmpint (pipe (fd), ==, 0);
761 
762   /* FIXME: These should not be needed but otherwise
763    * g_unix_output_stream_writev() will block because
764    *   a) the fd is writable
765    *   b) writing 4x capacity will block because writes are atomic
766    *   c) the fd is blocking
767    *
768    * See https://gitlab.gnome.org/GNOME/glib/issues/1654
769    */
770   g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
771   g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
772 
773   g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
774   pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
775   g_assert_cmpint (pipe_capacity, >=, 4096);
776 
777   data = g_new (guint8, 4 * pipe_capacity);
778   for (i = 0; i < 4 * pipe_capacity; i++)
779     data[i] = i;
780   data_read = g_new (guint8, 4 * pipe_capacity);
781 
782   vectors[0].buffer = data;
783   vectors[0].size = 1024;
784   vectors[1].buffer = data + 1024;
785   vectors[1].size = 1024;
786   vectors[2].buffer = data + 2048;
787   vectors[2].size = 1024;
788   vectors[3].buffer = data + 3072;
789   vectors[3].size = 4 * pipe_capacity - 3072;
790 
791   is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
792   os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
793 
794   g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
795                                     vectors,
796                                     G_N_ELEMENTS (vectors),
797                                     G_PRIORITY_DEFAULT,
798                                     NULL,
799                                     (GAsyncReadyCallback) writev_async_wouldblock_cb,
800                                     &bytes_written);
801 
802   g_input_stream_read_all_async (G_INPUT_STREAM (is),
803                                  data_read,
804                                  4 * pipe_capacity,
805                                  G_PRIORITY_DEFAULT,
806                                  NULL,
807                                  (GAsyncReadyCallback) read_async_wouldblock_cb,
808                                  &bytes_read);
809 
810   while (bytes_written == 0 && bytes_read == 0)
811     g_main_context_iteration (NULL, TRUE);
812 
813   g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
814   g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
815   g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
816 
817   g_free (data);
818   g_free (data_read);
819 
820   g_object_unref (os);
821   g_object_unref (is);
822 #endif  /* F_GETPIPE_SZ */
823 }
824 
825 int
main(int argc,char * argv[])826 main (int   argc,
827       char *argv[])
828 {
829   g_test_init (&argc, &argv, NULL);
830 
831   g_test_add_func ("/unix-streams/basic", test_basic);
832   g_test_add_data_func ("/unix-streams/pipe-io-test",
833 			GINT_TO_POINTER (FALSE),
834 			test_pipe_io);
835   g_test_add_data_func ("/unix-streams/nonblocking-io-test",
836 			GINT_TO_POINTER (TRUE),
837 			test_pipe_io);
838 
839   g_test_add_data_func ("/unix-streams/read_write",
840                         GINT_TO_POINTER (FALSE),
841                         test_read_write);
842 
843   g_test_add_data_func ("/unix-streams/read_writev",
844                         GINT_TO_POINTER (TRUE),
845                         test_read_write);
846 
847   g_test_add_func ("/unix-streams/write-wouldblock",
848 		   test_write_wouldblock);
849   g_test_add_func ("/unix-streams/writev-wouldblock",
850 		   test_writev_wouldblock);
851 
852   g_test_add_func ("/unix-streams/write-async-wouldblock",
853 		   test_write_async_wouldblock);
854   g_test_add_func ("/unix-streams/writev-async-wouldblock",
855 		   test_writev_async_wouldblock);
856 
857   return g_test_run();
858 }
859