1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3 * Copyright 2013 Red Hat, Inc.
4 */
5
6 #include "test-utils.h"
7
8 static void
force_io_streams_init(void)9 force_io_streams_init (void)
10 {
11 SoupServer *server;
12 SoupSession *session;
13 SoupURI *base_uri;
14 SoupMessage *msg;
15
16 /* Poke libsoup enough to cause SoupBodyInputStream and
17 * SoupBodyOutputStream to get defined, so we can find them
18 * via g_type_from_name() later.
19 */
20
21 server = soup_test_server_new (TRUE);
22 base_uri = soup_test_server_get_uri (server, "http", NULL);
23
24 session = soup_test_session_new (SOUP_TYPE_SESSION, NULL);
25 msg = soup_message_new_from_uri ("POST", base_uri);
26 soup_session_send_message (session, msg);
27 g_object_unref (msg);
28 soup_test_session_abort_unref (session);
29
30 soup_uri_free (base_uri);
31 soup_test_server_quit_unref (server);
32 }
33
34 typedef struct {
35 GFilterInputStream grandparent;
36
37 gpointer *soup_filter_input_stream_private;
38
39 gboolean is_readable;
40 } SlowInputStream;
41
42 typedef struct {
43 GFilterInputStreamClass grandparent;
44 } SlowInputStreamClass;
45
46 GType slow_input_stream_get_type (void);
47 static void slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface,
48 gpointer interface_data);
49
50 G_DEFINE_TYPE_WITH_CODE (SlowInputStream, slow_input_stream,
51 g_type_from_name ("SoupFilterInputStream"),
52 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, slow_pollable_input_stream_init);
53 )
54
55 static void
slow_input_stream_init(SlowInputStream * sis)56 slow_input_stream_init (SlowInputStream *sis)
57 {
58 }
59
60 static gssize
slow_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)61 slow_input_stream_read (GInputStream *stream,
62 void *buffer,
63 gsize count,
64 GCancellable *cancellable,
65 GError **error)
66 {
67 return g_input_stream_read (G_FILTER_INPUT_STREAM (stream)->base_stream,
68 buffer, 1, cancellable, error);
69 }
70
71 static void
slow_input_stream_class_init(SlowInputStreamClass * sisclass)72 slow_input_stream_class_init (SlowInputStreamClass *sisclass)
73 {
74 GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (sisclass);
75
76 input_stream_class->read_fn = slow_input_stream_read;
77 }
78
79 static gboolean
slow_input_stream_is_readable(GPollableInputStream * stream)80 slow_input_stream_is_readable (GPollableInputStream *stream)
81 {
82 return ((SlowInputStream *)stream)->is_readable;
83 }
84
85 static gssize
slow_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)86 slow_input_stream_read_nonblocking (GPollableInputStream *stream,
87 void *buffer,
88 gsize count,
89 GError **error)
90 {
91 if (((SlowInputStream *)stream)->is_readable) {
92 ((SlowInputStream *)stream)->is_readable = FALSE;
93 return slow_input_stream_read (G_INPUT_STREAM (stream), buffer, count,
94 NULL, error);
95 } else {
96 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
97 "would block");
98 return -1;
99 }
100 }
101
102 static GSource *
slow_input_stream_create_source(GPollableInputStream * stream,GCancellable * cancellable)103 slow_input_stream_create_source (GPollableInputStream *stream,
104 GCancellable *cancellable)
105 {
106 GSource *base_source, *pollable_source;
107
108 ((SlowInputStream *)stream)->is_readable = TRUE;
109 base_source = g_timeout_source_new (0);
110 g_source_set_dummy_callback (base_source);
111
112 pollable_source = g_pollable_source_new (G_OBJECT (stream));
113 g_source_add_child_source (pollable_source, base_source);
114 g_source_unref (base_source);
115
116 return pollable_source;
117 }
118
119 static void
slow_pollable_input_stream_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)120 slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface,
121 gpointer interface_data)
122 {
123 pollable_interface->is_readable = slow_input_stream_is_readable;
124 pollable_interface->read_nonblocking = slow_input_stream_read_nonblocking;
125 pollable_interface->create_source = slow_input_stream_create_source;
126 }
127
128 typedef struct {
129 GFilterOutputStream parent;
130
131 gboolean is_writable;
132 } SlowOutputStream;
133
134 typedef struct {
135 GFilterOutputStreamClass parent;
136 } SlowOutputStreamClass;
137
138 GType slow_output_stream_get_type (void);
139
140 static void slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
141 gpointer interface_data);
142
143 G_DEFINE_TYPE_WITH_CODE (SlowOutputStream, slow_output_stream,
144 g_type_from_name ("GFilterOutputStream"),
145 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, slow_pollable_output_stream_init);
146 )
147
148 static void
slow_output_stream_init(SlowOutputStream * sis)149 slow_output_stream_init (SlowOutputStream *sis)
150 {
151 }
152
153 static gssize
slow_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)154 slow_output_stream_write (GOutputStream *stream,
155 const void *buffer,
156 gsize count,
157 GCancellable *cancellable,
158 GError **error)
159 {
160 return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream,
161 buffer, 1, cancellable, error);
162 }
163
164 static void
slow_output_stream_class_init(SlowOutputStreamClass * sisclass)165 slow_output_stream_class_init (SlowOutputStreamClass *sisclass)
166 {
167 GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass);
168
169 output_stream_class->write_fn = slow_output_stream_write;
170 }
171
172 static gboolean
slow_output_stream_is_writable(GPollableOutputStream * stream)173 slow_output_stream_is_writable (GPollableOutputStream *stream)
174 {
175 return ((SlowOutputStream *)stream)->is_writable;
176 }
177
178 static gssize
slow_output_stream_write_nonblocking(GPollableOutputStream * stream,const void * buffer,gsize count,GError ** error)179 slow_output_stream_write_nonblocking (GPollableOutputStream *stream,
180 const void *buffer,
181 gsize count,
182 GError **error)
183 {
184 if (((SlowOutputStream *)stream)->is_writable) {
185 ((SlowOutputStream *)stream)->is_writable = FALSE;
186 return slow_output_stream_write (G_OUTPUT_STREAM (stream), buffer, count,
187 NULL, error);
188 } else {
189 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
190 "would block");
191 return -1;
192 }
193 }
194
195 static GSource *
slow_output_stream_create_source(GPollableOutputStream * stream,GCancellable * cancellable)196 slow_output_stream_create_source (GPollableOutputStream *stream,
197 GCancellable *cancellable)
198 {
199 GSource *base_source, *pollable_source;
200
201 ((SlowOutputStream *)stream)->is_writable = TRUE;
202 base_source = g_timeout_source_new (0);
203 g_source_set_dummy_callback (base_source);
204
205 pollable_source = g_pollable_source_new (G_OBJECT (stream));
206 g_source_add_child_source (pollable_source, base_source);
207 g_source_unref (base_source);
208
209 return pollable_source;
210 }
211
212 static void
slow_pollable_output_stream_init(GPollableOutputStreamInterface * pollable_interface,gpointer interface_data)213 slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
214 gpointer interface_data)
215 {
216 pollable_interface->is_writable = slow_output_stream_is_writable;
217 pollable_interface->write_nonblocking = slow_output_stream_write_nonblocking;
218 pollable_interface->create_source = slow_output_stream_create_source;
219 }
220
221 typedef struct {
222 GFilterOutputStream parent;
223
224 gboolean is_broken;
225 } BreakingOutputStream;
226
227 typedef struct {
228 GFilterOutputStreamClass parent;
229 } BreakingOutputStreamClass;
230
231 GType breaking_output_stream_get_type (void);
232
233 static void breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
234 gpointer interface_data);
235
236 G_DEFINE_TYPE_WITH_CODE (BreakingOutputStream, breaking_output_stream,
237 g_type_from_name ("GFilterOutputStream"),
238 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, breaking_pollable_output_stream_init);
239 )
240
241 static void
breaking_output_stream_init(BreakingOutputStream * sis)242 breaking_output_stream_init (BreakingOutputStream *sis)
243 {
244 }
245
246 static gssize
breaking_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)247 breaking_output_stream_write (GOutputStream *stream,
248 const void *buffer,
249 gsize count,
250 GCancellable *cancellable,
251 GError **error)
252 {
253 if (((BreakingOutputStream *)stream)->is_broken) {
254 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed");
255 return -1;
256 }
257
258 if (count > 128) {
259 ((BreakingOutputStream *)stream)->is_broken = TRUE;
260 count /= 2;
261 }
262 return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream,
263 buffer, count, cancellable, error);
264 }
265
266 static void
breaking_output_stream_class_init(BreakingOutputStreamClass * sisclass)267 breaking_output_stream_class_init (BreakingOutputStreamClass *sisclass)
268 {
269 GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass);
270
271 output_stream_class->write_fn = breaking_output_stream_write;
272 }
273
274 static gboolean
breaking_output_stream_is_writable(GPollableOutputStream * stream)275 breaking_output_stream_is_writable (GPollableOutputStream *stream)
276 {
277 return TRUE;
278 }
279
280 static gssize
breaking_output_stream_write_nonblocking(GPollableOutputStream * stream,const void * buffer,gsize count,GError ** error)281 breaking_output_stream_write_nonblocking (GPollableOutputStream *stream,
282 const void *buffer,
283 gsize count,
284 GError **error)
285 {
286 if (((BreakingOutputStream *)stream)->is_broken) {
287 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed");
288 return -1;
289 }
290
291 if (count > 128) {
292 ((BreakingOutputStream *)stream)->is_broken = TRUE;
293 count /= 2;
294 }
295 return g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (G_FILTER_OUTPUT_STREAM (stream)->base_stream),
296 buffer, count, NULL, error);
297 }
298
299 static GSource *
breaking_output_stream_create_source(GPollableOutputStream * stream,GCancellable * cancellable)300 breaking_output_stream_create_source (GPollableOutputStream *stream,
301 GCancellable *cancellable)
302 {
303 GSource *base_source, *pollable_source;
304
305 base_source = g_timeout_source_new (0);
306 g_source_set_dummy_callback (base_source);
307
308 pollable_source = g_pollable_source_new (G_OBJECT (stream));
309 g_source_add_child_source (pollable_source, base_source);
310 g_source_unref (base_source);
311
312 return pollable_source;
313 }
314
315 static void
breaking_pollable_output_stream_init(GPollableOutputStreamInterface * pollable_interface,gpointer interface_data)316 breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
317 gpointer interface_data)
318 {
319 pollable_interface->is_writable = breaking_output_stream_is_writable;
320 pollable_interface->write_nonblocking = breaking_output_stream_write_nonblocking;
321 pollable_interface->create_source = breaking_output_stream_create_source;
322 }
323
324 #define CHUNK_SIZE 1024
325
326 static GString *
chunkify(const char * str,gsize length)327 chunkify (const char *str, gsize length)
328 {
329 GString *gstr;
330 int i, size;
331
332 gstr = g_string_new (NULL);
333 for (i = 0; i < length; i += CHUNK_SIZE) {
334 size = MIN (CHUNK_SIZE, length - i);
335 g_string_append_printf (gstr, "%x\r\n", size);
336 g_string_append_len (gstr, str + i, size);
337 g_string_append (gstr, "\r\n");
338 }
339 g_string_append (gstr, "0\r\n\r\n");
340
341 return gstr;
342 }
343
344 static void
do_io_tests(void)345 do_io_tests (void)
346 {
347 GInputStream *imem, *islow, *in;
348 GOutputStream *omem, *oslow, *out;
349 GMemoryOutputStream *mem;
350 SoupBuffer *raw_contents;
351 char *buf;
352 GString *chunkified;
353 GError *error = NULL;
354 gssize nread, nwrote, total;
355 gssize chunk_length, chunk_total;
356
357 raw_contents = soup_test_get_index ();
358 chunkified = chunkify (raw_contents->data, raw_contents->length);
359
360 debug_printf (1, " sync read\n");
361
362 imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL);
363 islow = g_object_new (slow_input_stream_get_type (),
364 "base-stream", imem,
365 "close-base-stream", TRUE,
366 NULL);
367 in = g_object_new (g_type_from_name ("SoupBodyInputStream"),
368 "base-stream", islow,
369 "close-base-stream", TRUE,
370 "encoding", SOUP_ENCODING_CHUNKED,
371 NULL);
372 g_object_unref (imem);
373 g_object_unref (islow);
374
375 buf = g_malloc (raw_contents->length);
376 total = 0;
377 while (TRUE) {
378 nread = g_input_stream_read (in, buf + total,
379 raw_contents->length - total,
380 NULL, &error);
381 g_assert_no_error (error);
382 g_clear_error (&error);
383 if (nread > 0)
384 total += nread;
385 else
386 break;
387 }
388
389 g_input_stream_close (in, NULL, &error);
390 g_assert_no_error (error);
391 g_clear_error (&error);
392 g_object_unref (in);
393
394 soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length);
395 g_free (buf);
396
397 debug_printf (1, " async read\n");
398
399 imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL);
400 islow = g_object_new (slow_input_stream_get_type (),
401 "base-stream", imem,
402 "close-base-stream", TRUE,
403 NULL);
404 in = g_object_new (g_type_from_name ("SoupBodyInputStream"),
405 "base-stream", islow,
406 "close-base-stream", TRUE,
407 "encoding", SOUP_ENCODING_CHUNKED,
408 NULL);
409 g_object_unref (imem);
410 g_object_unref (islow);
411
412 buf = g_malloc (raw_contents->length);
413 total = 0;
414 while (TRUE) {
415 nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (in),
416 buf + total,
417 raw_contents->length - total,
418 NULL, &error);
419 if (nread == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
420 GSource *source;
421
422 g_clear_error (&error);
423 source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (in), NULL);
424 g_source_set_dummy_callback (source);
425 g_source_attach (source, NULL);
426 while (!g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (in)))
427 g_main_context_iteration (NULL, TRUE);
428 g_source_destroy (source);
429 g_source_unref (source);
430 continue;
431 } else if (nread == -1) {
432 g_assert_no_error (error);
433 g_clear_error (&error);
434 break;
435 } else if (nread == 0)
436 break;
437 else
438 total += nread;
439 }
440
441 g_input_stream_close (in, NULL, &error);
442 g_assert_no_error (error);
443 g_clear_error (&error);
444 g_object_unref (in);
445
446 soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length);
447 g_free (buf);
448
449 debug_printf (1, " sync write\n");
450
451 buf = g_malloc (chunkified->len);
452 omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
453 oslow = g_object_new (slow_output_stream_get_type (),
454 "base-stream", omem,
455 "close-base-stream", TRUE,
456 NULL);
457 out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
458 "base-stream", oslow,
459 "close-base-stream", TRUE,
460 "encoding", SOUP_ENCODING_CHUNKED,
461 NULL);
462 g_object_unref (omem);
463 g_object_unref (oslow);
464
465 total = chunk_length = chunk_total = 0;
466 while (total < raw_contents->length) {
467 if (chunk_total == chunk_length) {
468 chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total);
469 chunk_total = 0;
470 }
471 nwrote = g_output_stream_write (out, raw_contents->data + total,
472 chunk_length - chunk_total, NULL, &error);
473 g_assert_no_error (error);
474 g_clear_error (&error);
475 if (nwrote > 0) {
476 total += nwrote;
477 chunk_total += nwrote;
478 } else
479 break;
480 }
481
482 g_output_stream_close (out, NULL, &error);
483 g_assert_no_error (error);
484 g_clear_error (&error);
485
486 mem = G_MEMORY_OUTPUT_STREAM (omem);
487 soup_assert_cmpmem (g_memory_output_stream_get_data (mem),
488 g_memory_output_stream_get_data_size (mem),
489 chunkified->str, chunkified->len);
490
491 g_object_unref (out);
492 g_free (buf);
493
494 debug_printf (1, " async write\n");
495
496 buf = g_malloc (chunkified->len);
497 omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
498 oslow = g_object_new (slow_output_stream_get_type (),
499 "base-stream", omem,
500 "close-base-stream", TRUE,
501 NULL);
502 out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
503 "base-stream", oslow,
504 "close-base-stream", TRUE,
505 "encoding", SOUP_ENCODING_CHUNKED,
506 NULL);
507 g_object_unref (omem);
508 g_object_unref (oslow);
509
510 total = chunk_length = chunk_total = 0;
511 while (total < raw_contents->length) {
512 if (chunk_total == chunk_length) {
513 chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total);
514 chunk_total = 0;
515 }
516 nwrote = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (out),
517 raw_contents->data + total,
518 chunk_length - chunk_total,
519 NULL, &error);
520 if (nwrote == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
521 GSource *source;
522
523 g_clear_error (&error);
524 source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (out), NULL);
525 g_source_set_dummy_callback (source);
526 g_source_attach (source, NULL);
527 while (!g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (out)))
528 g_main_context_iteration (NULL, TRUE);
529 g_source_destroy (source);
530 g_source_unref (source);
531 continue;
532 } else if (nwrote == -1) {
533 g_assert_no_error (error);
534 g_clear_error (&error);
535 break;
536 } else {
537 total += nwrote;
538 chunk_total += nwrote;
539 }
540 }
541
542 g_output_stream_close (out, NULL, &error);
543 g_assert_no_error (error);
544 g_clear_error (&error);
545
546 mem = G_MEMORY_OUTPUT_STREAM (omem);
547 soup_assert_cmpmem (g_memory_output_stream_get_data (mem),
548 g_memory_output_stream_get_data_size (mem),
549 chunkified->str, chunkified->len);
550
551 g_object_unref (out);
552 g_free (buf);
553
554 debug_printf (1, " failed write\n");
555 /* this succeeds if it doesn't critical */
556
557 buf = g_malloc (chunkified->len);
558 omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
559 oslow = g_object_new (breaking_output_stream_get_type (),
560 "base-stream", omem,
561 "close-base-stream", TRUE,
562 NULL);
563 out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
564 "base-stream", oslow,
565 "close-base-stream", TRUE,
566 "encoding", SOUP_ENCODING_CHUNKED,
567 NULL);
568 g_object_unref (omem);
569 g_object_unref (oslow);
570
571 total = 0;
572 while (total < raw_contents->length) {
573 nwrote = g_output_stream_write (out, raw_contents->data + total,
574 raw_contents->length - total, NULL, NULL);
575 if (nwrote == -1)
576 break;
577 else
578 total += nwrote;
579 }
580
581 g_assert_cmpint (total, !=, raw_contents->length);
582
583 g_output_stream_close (out, NULL, NULL);
584 g_object_unref (out);
585
586 g_free (buf);
587
588 g_string_free (chunkified, TRUE);
589 }
590
591 int
main(int argc,char ** argv)592 main (int argc, char **argv)
593 {
594 int ret;
595
596 test_init (argc, argv, NULL);
597
598 force_io_streams_init ();
599
600 g_test_add_func ("/chunk-io", do_io_tests);
601
602 ret = g_test_run ();
603
604 test_cleanup ();
605 return ret;
606 }
607