1 /* GLib testing framework examples and tests
2 * Copyright (C) 2008 Red Hat, Inc
3 *
4 * This work is provided "as is"; redistribution and modification
5 * in whole or in part, in any medium, physical or electronic is
6 * permitted without restriction.
7 *
8 * This work is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11 *
12 * In no event shall the authors or contributors be liable for any
13 * direct, indirect, incidental, special, exemplary, or consequential
14 * damages (including, but not limited to, procurement of substitute
15 * goods or services; loss of use, data, or profits; or business
16 * interruption) however caused and on any theory of liability, whether
17 * in contract, strict liability, or tort (including negligence or
18 * otherwise) arising in any way out of the use of this software, even
19 * if advised of the possibility of such damage.
20 */
21
22 #include <gio/gio.h>
23 #include <gio/gunixinputstream.h>
24 #include <gio/gunixoutputstream.h>
25 #include <glib/glib-unix.h>
26 #include <signal.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <fcntl.h>
31
32 #define DATA "abcdefghijklmnopqrstuvwxyz"
33
34 int writer_pipe[2], reader_pipe[2];
35 GCancellable *writer_cancel, *reader_cancel, *main_cancel;
36 GMainLoop *loop;
37
38
39 static gpointer
writer_thread(gpointer user_data)40 writer_thread (gpointer user_data)
41 {
42 GOutputStream *out;
43 gssize nwrote, offset;
44 GError *err = NULL;
45
46 out = g_unix_output_stream_new (writer_pipe[1], TRUE);
47
48 do
49 {
50 g_usleep (10);
51
52 offset = 0;
53 while (offset < (gssize) sizeof (DATA))
54 {
55 nwrote = g_output_stream_write (out, DATA + offset,
56 sizeof (DATA) - offset,
57 writer_cancel, &err);
58 if (nwrote <= 0 || err != NULL)
59 break;
60 offset += nwrote;
61 }
62
63 g_assert (nwrote > 0 || err != NULL);
64 }
65 while (err == NULL);
66
67 if (g_cancellable_is_cancelled (writer_cancel))
68 {
69 g_clear_error (&err);
70 g_cancellable_cancel (main_cancel);
71 g_object_unref (out);
72 return NULL;
73 }
74
75 g_warning ("writer: %s", err->message);
76 g_assert_not_reached ();
77 }
78
79 static gpointer
reader_thread(gpointer user_data)80 reader_thread (gpointer user_data)
81 {
82 GInputStream *in;
83 gssize nread = 0, total;
84 GError *err = NULL;
85 char buf[sizeof (DATA)];
86
87 in = g_unix_input_stream_new (reader_pipe[0], TRUE);
88
89 do
90 {
91 total = 0;
92 while (total < (gssize) sizeof (DATA))
93 {
94 nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
95 reader_cancel, &err);
96 if (nread <= 0 || err != NULL)
97 break;
98 total += nread;
99 }
100
101 if (err)
102 break;
103
104 if (nread == 0)
105 {
106 g_assert (err == NULL);
107 /* pipe closed */
108 g_object_unref (in);
109 return NULL;
110 }
111
112 g_assert_cmpstr (buf, ==, DATA);
113 g_assert (!g_cancellable_is_cancelled (reader_cancel));
114 }
115 while (err == NULL);
116
117 g_warning ("reader: %s", err->message);
118 g_assert_not_reached ();
119 }
120
121 char main_buf[sizeof (DATA)];
122 gssize main_len, main_offset;
123
124 static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
125 static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
126 static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
127
128 static void
do_main_cancel(GOutputStream * out)129 do_main_cancel (GOutputStream *out)
130 {
131 g_output_stream_close (out, NULL, NULL);
132 g_main_loop_quit (loop);
133 }
134
135 static void
main_thread_skipped(GObject * source,GAsyncResult * res,gpointer user_data)136 main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
137 {
138 GInputStream *in = G_INPUT_STREAM (source);
139 GOutputStream *out = user_data;
140 GError *err = NULL;
141 gssize nskipped;
142
143 nskipped = g_input_stream_skip_finish (in, res, &err);
144
145 if (g_cancellable_is_cancelled (main_cancel))
146 {
147 do_main_cancel (out);
148 return;
149 }
150
151 g_assert_no_error (err);
152
153 main_offset += nskipped;
154 if (main_offset == main_len)
155 {
156 main_offset = 0;
157 g_output_stream_write_async (out, main_buf, main_len,
158 G_PRIORITY_DEFAULT, main_cancel,
159 main_thread_wrote, in);
160 }
161 else
162 {
163 g_input_stream_skip_async (in, main_len - main_offset,
164 G_PRIORITY_DEFAULT, main_cancel,
165 main_thread_skipped, out);
166 }
167 }
168
169 static void
main_thread_read(GObject * source,GAsyncResult * res,gpointer user_data)170 main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
171 {
172 GInputStream *in = G_INPUT_STREAM (source);
173 GOutputStream *out = user_data;
174 GError *err = NULL;
175 gssize nread;
176
177 nread = g_input_stream_read_finish (in, res, &err);
178
179 if (g_cancellable_is_cancelled (main_cancel))
180 {
181 do_main_cancel (out);
182 g_clear_error (&err);
183 return;
184 }
185
186 g_assert_no_error (err);
187
188 main_offset += nread;
189 if (main_offset == sizeof (DATA))
190 {
191 main_len = main_offset;
192 main_offset = 0;
193 /* Now skip the same amount */
194 g_input_stream_skip_async (in, main_len,
195 G_PRIORITY_DEFAULT, main_cancel,
196 main_thread_skipped, out);
197 }
198 else
199 {
200 g_input_stream_read_async (in, main_buf, sizeof (main_buf),
201 G_PRIORITY_DEFAULT, main_cancel,
202 main_thread_read, out);
203 }
204 }
205
206 static void
main_thread_wrote(GObject * source,GAsyncResult * res,gpointer user_data)207 main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
208 {
209 GOutputStream *out = G_OUTPUT_STREAM (source);
210 GInputStream *in = user_data;
211 GError *err = NULL;
212 gssize nwrote;
213
214 nwrote = g_output_stream_write_finish (out, res, &err);
215
216 if (g_cancellable_is_cancelled (main_cancel))
217 {
218 do_main_cancel (out);
219 g_clear_error (&err);
220 return;
221 }
222
223 g_assert_no_error (err);
224 g_assert_cmpint (nwrote, <=, main_len - main_offset);
225
226 main_offset += nwrote;
227 if (main_offset == main_len)
228 {
229 main_offset = 0;
230 g_input_stream_read_async (in, main_buf, sizeof (main_buf),
231 G_PRIORITY_DEFAULT, main_cancel,
232 main_thread_read, out);
233 }
234 else
235 {
236 g_output_stream_write_async (out, main_buf + main_offset,
237 main_len - main_offset,
238 G_PRIORITY_DEFAULT, main_cancel,
239 main_thread_wrote, in);
240 }
241 }
242
243 static gboolean
timeout(gpointer cancellable)244 timeout (gpointer cancellable)
245 {
246 g_cancellable_cancel (cancellable);
247 return FALSE;
248 }
249
250 static void
test_pipe_io(gconstpointer nonblocking)251 test_pipe_io (gconstpointer nonblocking)
252 {
253 GThread *writer, *reader;
254 GInputStream *in;
255 GOutputStream *out;
256
257 /* Split off two (additional) threads, a reader and a writer. From
258 * the writer thread, write data synchronously in small chunks,
259 * which gets alternately read and skipped asynchronously by the
260 * main thread and then (if not skipped) written asynchronously to
261 * the reader thread, which reads it synchronously. Eventually a
262 * timeout in the main thread will cause it to cancel the writer
263 * thread, which will in turn cancel the read op in the main thread,
264 * which will then close the pipe to the reader thread, causing the
265 * read op to fail.
266 */
267
268 g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
269
270 if (nonblocking)
271 {
272 GError *error = NULL;
273
274 g_unix_set_fd_nonblocking (writer_pipe[0], TRUE, &error);
275 g_assert_no_error (error);
276 g_unix_set_fd_nonblocking (writer_pipe[1], TRUE, &error);
277 g_assert_no_error (error);
278 g_unix_set_fd_nonblocking (reader_pipe[0], TRUE, &error);
279 g_assert_no_error (error);
280 g_unix_set_fd_nonblocking (reader_pipe[1], TRUE, &error);
281 g_assert_no_error (error);
282 }
283
284 writer_cancel = g_cancellable_new ();
285 reader_cancel = g_cancellable_new ();
286 main_cancel = g_cancellable_new ();
287
288 writer = g_thread_new ("writer", writer_thread, NULL);
289 reader = g_thread_new ("reader", reader_thread, NULL);
290
291 in = g_unix_input_stream_new (writer_pipe[0], TRUE);
292 out = g_unix_output_stream_new (reader_pipe[1], TRUE);
293
294 g_input_stream_read_async (in, main_buf, sizeof (main_buf),
295 G_PRIORITY_DEFAULT, main_cancel,
296 main_thread_read, out);
297
298 g_timeout_add (500, timeout, writer_cancel);
299
300 loop = g_main_loop_new (NULL, TRUE);
301 g_main_loop_run (loop);
302 g_main_loop_unref (loop);
303
304 g_thread_join (reader);
305 g_thread_join (writer);
306
307 g_object_unref (main_cancel);
308 g_object_unref (reader_cancel);
309 g_object_unref (writer_cancel);
310 g_object_unref (in);
311 g_object_unref (out);
312 }
313
314 static void
test_basic(void)315 test_basic (void)
316 {
317 GUnixInputStream *is;
318 GUnixOutputStream *os;
319 gint fd;
320 gboolean close_fd;
321
322 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE));
323 g_object_get (is,
324 "fd", &fd,
325 "close-fd", &close_fd,
326 NULL);
327 g_assert_cmpint (fd, ==, 0);
328 g_assert (close_fd);
329
330 g_unix_input_stream_set_close_fd (is, FALSE);
331 g_assert (!g_unix_input_stream_get_close_fd (is));
332 g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
333
334 g_assert (!g_input_stream_has_pending (G_INPUT_STREAM (is)));
335
336 g_object_unref (is);
337
338 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
339 g_object_get (os,
340 "fd", &fd,
341 "close-fd", &close_fd,
342 NULL);
343 g_assert_cmpint (fd, ==, 1);
344 g_assert (close_fd);
345
346 g_unix_output_stream_set_close_fd (os, FALSE);
347 g_assert (!g_unix_output_stream_get_close_fd (os));
348 g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
349
350 g_assert (!g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
351
352 g_object_unref (os);
353 }
354
355 typedef struct {
356 GInputStream *is;
357 GOutputStream *os;
358 const guint8 *write_data;
359 guint8 *read_data;
360 } TestReadWriteData;
361
362 static gpointer
test_read_write_write_thread(gpointer user_data)363 test_read_write_write_thread (gpointer user_data)
364 {
365 TestReadWriteData *data = user_data;
366 gsize bytes_written;
367 GError *error = NULL;
368 gboolean res;
369
370 res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
371 g_assert_true (res);
372 g_assert_no_error (error);
373 g_assert_cmpuint (bytes_written, ==, 1024);
374
375 return NULL;
376 }
377
378 static gpointer
test_read_write_read_thread(gpointer user_data)379 test_read_write_read_thread (gpointer user_data)
380 {
381 TestReadWriteData *data = user_data;
382 gsize bytes_read;
383 GError *error = NULL;
384 gboolean res;
385
386 res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
387 g_assert_true (res);
388 g_assert_no_error (error);
389 g_assert_cmpuint (bytes_read, ==, 1024);
390
391 return NULL;
392 }
393
394 static gpointer
test_read_write_writev_thread(gpointer user_data)395 test_read_write_writev_thread (gpointer user_data)
396 {
397 TestReadWriteData *data = user_data;
398 gsize bytes_written;
399 GError *error = NULL;
400 gboolean res;
401 GOutputVector vectors[3];
402
403 vectors[0].buffer = data->write_data;
404 vectors[0].size = 256;
405 vectors[1].buffer = data->write_data + 256;
406 vectors[1].size = 256;
407 vectors[2].buffer = data->write_data + 512;
408 vectors[2].size = 512;
409
410 res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
411 g_assert_true (res);
412 g_assert_no_error (error);
413 g_assert_cmpuint (bytes_written, ==, 1024);
414
415 return NULL;
416 }
417
418 /* test if normal writing/reading from a pipe works */
419 static void
test_read_write(gconstpointer user_data)420 test_read_write (gconstpointer user_data)
421 {
422 gboolean writev = GPOINTER_TO_INT (user_data);
423 GUnixInputStream *is;
424 GUnixOutputStream *os;
425 gint fd[2];
426 guint8 data_write[1024], data_read[1024];
427 guint i;
428 GThread *write_thread, *read_thread;
429 TestReadWriteData data;
430
431 for (i = 0; i < sizeof (data_write); i++)
432 data_write[i] = i;
433
434 g_assert_cmpint (pipe (fd), ==, 0);
435
436 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
437 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
438
439 data.is = G_INPUT_STREAM (is);
440 data.os = G_OUTPUT_STREAM (os);
441 data.read_data = data_read;
442 data.write_data = data_write;
443
444 if (writev)
445 write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
446 else
447 write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
448 read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
449
450 g_thread_join (write_thread);
451 g_thread_join (read_thread);
452
453 g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
454
455 g_object_unref (os);
456 g_object_unref (is);
457 }
458
459 /* test if g_pollable_output_stream_write_nonblocking() and
460 * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
461 * and correctly reset their status afterwards again, and all data that is
462 * written can also be read again.
463 */
464 static void
test_write_wouldblock(void)465 test_write_wouldblock (void)
466 {
467 #ifndef F_GETPIPE_SZ
468 g_test_skip ("F_GETPIPE_SZ not defined");
469 #else /* if F_GETPIPE_SZ */
470 GUnixInputStream *is;
471 GUnixOutputStream *os;
472 gint fd[2];
473 GError *err = NULL;
474 guint8 data_write[1024], data_read[1024];
475 guint i;
476 gint pipe_capacity;
477
478 for (i = 0; i < sizeof (data_write); i++)
479 data_write[i] = i;
480
481 g_assert_cmpint (pipe (fd), ==, 0);
482
483 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
484 pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
485 g_assert_cmpint (pipe_capacity, >=, 4096);
486 g_assert_cmpint (pipe_capacity % 1024, >=, 0);
487
488 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
489 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
490
491 /* Run the whole thing three times to make sure that the streams
492 * reset the writability/readability state again */
493 for (i = 0; i < 3; i++) {
494 gssize written = 0, written_complete = 0;
495 gssize read = 0, read_complete = 0;
496
497 do
498 {
499 written_complete += written;
500 written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
501 data_write,
502 sizeof (data_write),
503 NULL,
504 &err);
505 }
506 while (written > 0);
507
508 g_assert_cmpuint (written_complete, >, 0);
509 g_assert_nonnull (err);
510 g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
511 g_clear_error (&err);
512
513 do
514 {
515 read_complete += read;
516 read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
517 data_read,
518 sizeof (data_read),
519 NULL,
520 &err);
521 if (read > 0)
522 g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
523 }
524 while (read > 0);
525
526 g_assert_cmpuint (read_complete, ==, written_complete);
527 g_assert_nonnull (err);
528 g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
529 g_clear_error (&err);
530 }
531
532 g_object_unref (os);
533 g_object_unref (is);
534 #endif /* if F_GETPIPE_SZ */
535 }
536
537 /* test if g_pollable_output_stream_writev_nonblocking() and
538 * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
539 * and correctly reset their status afterwards again, and all data that is
540 * written can also be read again.
541 */
542 static void
test_writev_wouldblock(void)543 test_writev_wouldblock (void)
544 {
545 #ifndef F_GETPIPE_SZ
546 g_test_skip ("F_GETPIPE_SZ not defined");
547 #else /* if F_GETPIPE_SZ */
548 GUnixInputStream *is;
549 GUnixOutputStream *os;
550 gint fd[2];
551 GError *err = NULL;
552 guint8 data_write[1024], data_read[1024];
553 guint i;
554 GOutputVector vectors[4];
555 GPollableReturn res;
556 gint pipe_capacity;
557
558 for (i = 0; i < sizeof (data_write); i++)
559 data_write[i] = i;
560
561 g_assert_cmpint (pipe (fd), ==, 0);
562
563 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
564 pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
565 g_assert_cmpint (pipe_capacity, >=, 4096);
566 g_assert_cmpint (pipe_capacity % 1024, >=, 0);
567
568 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
569 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
570
571 /* Run the whole thing three times to make sure that the streams
572 * reset the writability/readability state again */
573 for (i = 0; i < 3; i++) {
574 gsize written = 0, written_complete = 0;
575 gssize read = 0, read_complete = 0;
576
577 do
578 {
579 written_complete += written;
580
581 vectors[0].buffer = data_write;
582 vectors[0].size = 256;
583 vectors[1].buffer = data_write + 256;
584 vectors[1].size = 256;
585 vectors[2].buffer = data_write + 512;
586 vectors[2].size = 256;
587 vectors[3].buffer = data_write + 768;
588 vectors[3].size = 256;
589
590 res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
591 vectors,
592 G_N_ELEMENTS (vectors),
593 &written,
594 NULL,
595 &err);
596 }
597 while (res == G_POLLABLE_RETURN_OK);
598
599 g_assert_cmpuint (written_complete, >, 0);
600 g_assert_null (err);
601 g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
602 /* writev() on UNIX streams either succeeds fully or not at all */
603 g_assert_cmpuint (written, ==, 0);
604
605 do
606 {
607 read_complete += read;
608 read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
609 data_read,
610 sizeof (data_read),
611 NULL,
612 &err);
613 if (read > 0)
614 g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
615 }
616 while (read > 0);
617
618 g_assert_cmpuint (read_complete, ==, written_complete);
619 g_assert_nonnull (err);
620 g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
621 g_clear_error (&err);
622 }
623
624 g_object_unref (os);
625 g_object_unref (is);
626 #endif /* if F_GETPIPE_SZ */
627 }
628
629 #ifdef F_GETPIPE_SZ
630 static void
write_async_wouldblock_cb(GUnixOutputStream * os,GAsyncResult * result,gpointer user_data)631 write_async_wouldblock_cb (GUnixOutputStream *os,
632 GAsyncResult *result,
633 gpointer user_data)
634 {
635 gsize *bytes_written = user_data;
636 GError *err = NULL;
637
638 g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
639 g_assert_no_error (err);
640 }
641
642 static void
read_async_wouldblock_cb(GUnixInputStream * is,GAsyncResult * result,gpointer user_data)643 read_async_wouldblock_cb (GUnixInputStream *is,
644 GAsyncResult *result,
645 gpointer user_data)
646 {
647 gsize *bytes_read = user_data;
648 GError *err = NULL;
649
650 g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, &err);
651 g_assert_no_error (err);
652 }
653 #endif /* if F_GETPIPE_SZ */
654
655 /* test if the async implementation of write_all() and read_all() in G*Stream
656 * around the GPollable*Stream API is working correctly.
657 */
658 static void
test_write_async_wouldblock(void)659 test_write_async_wouldblock (void)
660 {
661 #ifndef F_GETPIPE_SZ
662 g_test_skip ("F_GETPIPE_SZ not defined");
663 #else /* if F_GETPIPE_SZ */
664 GUnixInputStream *is;
665 GUnixOutputStream *os;
666 gint fd[2];
667 guint8 *data, *data_read;
668 guint i;
669 gint pipe_capacity;
670 gsize bytes_written = 0, bytes_read = 0;
671
672 g_assert_cmpint (pipe (fd), ==, 0);
673
674 /* FIXME: These should not be needed but otherwise
675 * g_unix_output_stream_write() will block because
676 * a) the fd is writable
677 * b) writing 4x capacity will block because writes are atomic
678 * c) the fd is blocking
679 *
680 * See https://gitlab.gnome.org/GNOME/glib/issues/1654
681 */
682 g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
683 g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
684
685 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
686 pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
687 g_assert_cmpint (pipe_capacity, >=, 4096);
688
689 data = g_new (guint8, 4 * pipe_capacity);
690 for (i = 0; i < 4 * pipe_capacity; i++)
691 data[i] = i;
692 data_read = g_new (guint8, 4 * pipe_capacity);
693
694 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
695 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
696
697 g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
698 data,
699 4 * pipe_capacity,
700 G_PRIORITY_DEFAULT,
701 NULL,
702 (GAsyncReadyCallback) write_async_wouldblock_cb,
703 &bytes_written);
704
705 g_input_stream_read_all_async (G_INPUT_STREAM (is),
706 data_read,
707 4 * pipe_capacity,
708 G_PRIORITY_DEFAULT,
709 NULL,
710 (GAsyncReadyCallback) read_async_wouldblock_cb,
711 &bytes_read);
712
713 while (bytes_written == 0 && bytes_read == 0)
714 g_main_context_iteration (NULL, TRUE);
715
716 g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
717 g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
718 g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
719
720 g_free (data);
721 g_free (data_read);
722
723 g_object_unref (os);
724 g_object_unref (is);
725 #endif /* if F_GETPIPE_SZ */
726 }
727
728 #ifdef F_GETPIPE_SZ
729 static void
writev_async_wouldblock_cb(GUnixOutputStream * os,GAsyncResult * result,gpointer user_data)730 writev_async_wouldblock_cb (GUnixOutputStream *os,
731 GAsyncResult *result,
732 gpointer user_data)
733 {
734 gsize *bytes_written = user_data;
735 GError *err = NULL;
736
737 g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
738 g_assert_no_error (err);
739 }
740 #endif /* if F_GETPIPE_SZ */
741
742 /* test if the async implementation of writev_all() and read_all() in G*Stream
743 * around the GPollable*Stream API is working correctly.
744 */
745 static void
test_writev_async_wouldblock(void)746 test_writev_async_wouldblock (void)
747 {
748 #ifndef F_GETPIPE_SZ
749 g_test_skip ("F_GETPIPE_SZ not defined");
750 #else /* if F_GETPIPE_SZ */
751 GUnixInputStream *is;
752 GUnixOutputStream *os;
753 gint fd[2];
754 guint8 *data, *data_read;
755 guint i;
756 gint pipe_capacity;
757 gsize bytes_written = 0, bytes_read = 0;
758 GOutputVector vectors[4];
759
760 g_assert_cmpint (pipe (fd), ==, 0);
761
762 /* FIXME: These should not be needed but otherwise
763 * g_unix_output_stream_writev() will block because
764 * a) the fd is writable
765 * b) writing 4x capacity will block because writes are atomic
766 * c) the fd is blocking
767 *
768 * See https://gitlab.gnome.org/GNOME/glib/issues/1654
769 */
770 g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
771 g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
772
773 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
774 pipe_capacity = fcntl (fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
775 g_assert_cmpint (pipe_capacity, >=, 4096);
776
777 data = g_new (guint8, 4 * pipe_capacity);
778 for (i = 0; i < 4 * pipe_capacity; i++)
779 data[i] = i;
780 data_read = g_new (guint8, 4 * pipe_capacity);
781
782 vectors[0].buffer = data;
783 vectors[0].size = 1024;
784 vectors[1].buffer = data + 1024;
785 vectors[1].size = 1024;
786 vectors[2].buffer = data + 2048;
787 vectors[2].size = 1024;
788 vectors[3].buffer = data + 3072;
789 vectors[3].size = 4 * pipe_capacity - 3072;
790
791 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
792 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
793
794 g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
795 vectors,
796 G_N_ELEMENTS (vectors),
797 G_PRIORITY_DEFAULT,
798 NULL,
799 (GAsyncReadyCallback) writev_async_wouldblock_cb,
800 &bytes_written);
801
802 g_input_stream_read_all_async (G_INPUT_STREAM (is),
803 data_read,
804 4 * pipe_capacity,
805 G_PRIORITY_DEFAULT,
806 NULL,
807 (GAsyncReadyCallback) read_async_wouldblock_cb,
808 &bytes_read);
809
810 while (bytes_written == 0 && bytes_read == 0)
811 g_main_context_iteration (NULL, TRUE);
812
813 g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
814 g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
815 g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
816
817 g_free (data);
818 g_free (data_read);
819
820 g_object_unref (os);
821 g_object_unref (is);
822 #endif /* F_GETPIPE_SZ */
823 }
824
825 int
main(int argc,char * argv[])826 main (int argc,
827 char *argv[])
828 {
829 g_test_init (&argc, &argv, NULL);
830
831 g_test_add_func ("/unix-streams/basic", test_basic);
832 g_test_add_data_func ("/unix-streams/pipe-io-test",
833 GINT_TO_POINTER (FALSE),
834 test_pipe_io);
835 g_test_add_data_func ("/unix-streams/nonblocking-io-test",
836 GINT_TO_POINTER (TRUE),
837 test_pipe_io);
838
839 g_test_add_data_func ("/unix-streams/read_write",
840 GINT_TO_POINTER (FALSE),
841 test_read_write);
842
843 g_test_add_data_func ("/unix-streams/read_writev",
844 GINT_TO_POINTER (TRUE),
845 test_read_write);
846
847 g_test_add_func ("/unix-streams/write-wouldblock",
848 test_write_wouldblock);
849 g_test_add_func ("/unix-streams/writev-wouldblock",
850 test_writev_wouldblock);
851
852 g_test_add_func ("/unix-streams/write-async-wouldblock",
853 test_write_async_wouldblock);
854 g_test_add_func ("/unix-streams/writev-async-wouldblock",
855 test_writev_async_wouldblock);
856
857 return g_test_run();
858 }
859