• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright 2013 Red Hat, Inc.
4  */
5 
6 #include "test-utils.h"
7 
8 static void
force_io_streams_init(void)9 force_io_streams_init (void)
10 {
11 	SoupServer *server;
12 	SoupSession *session;
13 	SoupURI *base_uri;
14 	SoupMessage *msg;
15 
16 	/* Poke libsoup enough to cause SoupBodyInputStream and
17 	 * SoupBodyOutputStream to get defined, so we can find them
18 	 * via g_type_from_name() later.
19 	 */
20 
21 	server = soup_test_server_new (TRUE);
22 	base_uri = soup_test_server_get_uri (server, "http", NULL);
23 
24 	session = soup_test_session_new (SOUP_TYPE_SESSION, NULL);
25 	msg = soup_message_new_from_uri ("POST", base_uri);
26 	soup_session_send_message (session, msg);
27 	g_object_unref (msg);
28 	soup_test_session_abort_unref (session);
29 
30 	soup_uri_free (base_uri);
31 	soup_test_server_quit_unref (server);
32 }
33 
34 typedef struct {
35 	GFilterInputStream grandparent;
36 
37 	gpointer *soup_filter_input_stream_private;
38 
39 	gboolean is_readable;
40 } SlowInputStream;
41 
42 typedef struct {
43 	GFilterInputStreamClass grandparent;
44 } SlowInputStreamClass;
45 
46 GType slow_input_stream_get_type (void);
47 static void slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface,
48 					     gpointer interface_data);
49 
50 G_DEFINE_TYPE_WITH_CODE (SlowInputStream, slow_input_stream,
51 			 g_type_from_name ("SoupFilterInputStream"),
52 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, slow_pollable_input_stream_init);
53 			 )
54 
55 static void
slow_input_stream_init(SlowInputStream * sis)56 slow_input_stream_init (SlowInputStream *sis)
57 {
58 }
59 
60 static gssize
slow_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)61 slow_input_stream_read (GInputStream  *stream,
62 			void          *buffer,
63 			gsize          count,
64 			GCancellable  *cancellable,
65 			GError       **error)
66 {
67 	return g_input_stream_read (G_FILTER_INPUT_STREAM (stream)->base_stream,
68 				    buffer, 1, cancellable, error);
69 }
70 
71 static void
slow_input_stream_class_init(SlowInputStreamClass * sisclass)72 slow_input_stream_class_init (SlowInputStreamClass *sisclass)
73 {
74 	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (sisclass);
75 
76 	input_stream_class->read_fn = slow_input_stream_read;
77 }
78 
79 static gboolean
slow_input_stream_is_readable(GPollableInputStream * stream)80 slow_input_stream_is_readable (GPollableInputStream *stream)
81 {
82 	return ((SlowInputStream *)stream)->is_readable;
83 }
84 
85 static gssize
slow_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)86 slow_input_stream_read_nonblocking (GPollableInputStream  *stream,
87 				    void                  *buffer,
88 				    gsize                  count,
89 				    GError               **error)
90 {
91 	if (((SlowInputStream *)stream)->is_readable) {
92 		((SlowInputStream *)stream)->is_readable = FALSE;
93 		return slow_input_stream_read (G_INPUT_STREAM (stream), buffer, count,
94 					       NULL, error);
95 	} else {
96 		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
97 				     "would block");
98 		return -1;
99 	}
100 }
101 
102 static GSource *
slow_input_stream_create_source(GPollableInputStream * stream,GCancellable * cancellable)103 slow_input_stream_create_source (GPollableInputStream *stream,
104 				 GCancellable *cancellable)
105 {
106 	GSource *base_source, *pollable_source;
107 
108 	((SlowInputStream *)stream)->is_readable = TRUE;
109 	base_source = g_timeout_source_new (0);
110 	g_source_set_dummy_callback (base_source);
111 
112 	pollable_source = g_pollable_source_new (G_OBJECT (stream));
113 	g_source_add_child_source (pollable_source, base_source);
114 	g_source_unref (base_source);
115 
116 	return pollable_source;
117 }
118 
119 static void
slow_pollable_input_stream_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)120 slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface,
121 				 gpointer interface_data)
122 {
123 	pollable_interface->is_readable = slow_input_stream_is_readable;
124 	pollable_interface->read_nonblocking = slow_input_stream_read_nonblocking;
125 	pollable_interface->create_source = slow_input_stream_create_source;
126 }
127 
128 typedef struct {
129 	GFilterOutputStream parent;
130 
131 	gboolean is_writable;
132 } SlowOutputStream;
133 
134 typedef struct {
135 	GFilterOutputStreamClass parent;
136 } SlowOutputStreamClass;
137 
138 GType slow_output_stream_get_type (void);
139 
140 static void slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
141 					      gpointer interface_data);
142 
143 G_DEFINE_TYPE_WITH_CODE (SlowOutputStream, slow_output_stream,
144 			 g_type_from_name ("GFilterOutputStream"),
145 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, slow_pollable_output_stream_init);
146 			 )
147 
148 static void
slow_output_stream_init(SlowOutputStream * sis)149 slow_output_stream_init (SlowOutputStream *sis)
150 {
151 }
152 
153 static gssize
slow_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)154 slow_output_stream_write (GOutputStream  *stream,
155 			  const void     *buffer,
156 			  gsize           count,
157 			  GCancellable   *cancellable,
158 			  GError        **error)
159 {
160 	return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream,
161 				      buffer, 1, cancellable, error);
162 }
163 
164 static void
slow_output_stream_class_init(SlowOutputStreamClass * sisclass)165 slow_output_stream_class_init (SlowOutputStreamClass *sisclass)
166 {
167 	GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass);
168 
169 	output_stream_class->write_fn = slow_output_stream_write;
170 }
171 
172 static gboolean
slow_output_stream_is_writable(GPollableOutputStream * stream)173 slow_output_stream_is_writable (GPollableOutputStream *stream)
174 {
175 	return ((SlowOutputStream *)stream)->is_writable;
176 }
177 
178 static gssize
slow_output_stream_write_nonblocking(GPollableOutputStream * stream,const void * buffer,gsize count,GError ** error)179 slow_output_stream_write_nonblocking (GPollableOutputStream  *stream,
180 				      const void             *buffer,
181 				      gsize                   count,
182 				      GError                **error)
183 {
184 	if (((SlowOutputStream *)stream)->is_writable) {
185 		((SlowOutputStream *)stream)->is_writable = FALSE;
186 		return slow_output_stream_write (G_OUTPUT_STREAM (stream), buffer, count,
187 						 NULL, error);
188 	} else {
189 		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
190 				     "would block");
191 		return -1;
192 	}
193 }
194 
195 static GSource *
slow_output_stream_create_source(GPollableOutputStream * stream,GCancellable * cancellable)196 slow_output_stream_create_source (GPollableOutputStream *stream,
197 				  GCancellable *cancellable)
198 {
199 	GSource *base_source, *pollable_source;
200 
201 	((SlowOutputStream *)stream)->is_writable = TRUE;
202 	base_source = g_timeout_source_new (0);
203 	g_source_set_dummy_callback (base_source);
204 
205 	pollable_source = g_pollable_source_new (G_OBJECT (stream));
206 	g_source_add_child_source (pollable_source, base_source);
207 	g_source_unref (base_source);
208 
209 	return pollable_source;
210 }
211 
212 static void
slow_pollable_output_stream_init(GPollableOutputStreamInterface * pollable_interface,gpointer interface_data)213 slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
214 				  gpointer interface_data)
215 {
216 	pollable_interface->is_writable = slow_output_stream_is_writable;
217 	pollable_interface->write_nonblocking = slow_output_stream_write_nonblocking;
218 	pollable_interface->create_source = slow_output_stream_create_source;
219 }
220 
221 typedef struct {
222 	GFilterOutputStream parent;
223 
224 	gboolean is_broken;
225 } BreakingOutputStream;
226 
227 typedef struct {
228 	GFilterOutputStreamClass parent;
229 } BreakingOutputStreamClass;
230 
231 GType breaking_output_stream_get_type (void);
232 
233 static void breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
234 						  gpointer interface_data);
235 
236 G_DEFINE_TYPE_WITH_CODE (BreakingOutputStream, breaking_output_stream,
237 			 g_type_from_name ("GFilterOutputStream"),
238 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, breaking_pollable_output_stream_init);
239 			 )
240 
241 static void
breaking_output_stream_init(BreakingOutputStream * sis)242 breaking_output_stream_init (BreakingOutputStream *sis)
243 {
244 }
245 
246 static gssize
breaking_output_stream_write(GOutputStream * stream,const void * buffer,gsize count,GCancellable * cancellable,GError ** error)247 breaking_output_stream_write (GOutputStream  *stream,
248 			      const void     *buffer,
249 			      gsize           count,
250 			      GCancellable   *cancellable,
251 			      GError        **error)
252 {
253 	if (((BreakingOutputStream *)stream)->is_broken) {
254 		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed");
255 		return -1;
256 	}
257 
258 	if (count > 128) {
259 		((BreakingOutputStream *)stream)->is_broken = TRUE;
260 		count /= 2;
261 	}
262 	return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream,
263 				      buffer, count, cancellable, error);
264 }
265 
266 static void
breaking_output_stream_class_init(BreakingOutputStreamClass * sisclass)267 breaking_output_stream_class_init (BreakingOutputStreamClass *sisclass)
268 {
269 	GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass);
270 
271 	output_stream_class->write_fn = breaking_output_stream_write;
272 }
273 
274 static gboolean
breaking_output_stream_is_writable(GPollableOutputStream * stream)275 breaking_output_stream_is_writable (GPollableOutputStream *stream)
276 {
277 	return TRUE;
278 }
279 
280 static gssize
breaking_output_stream_write_nonblocking(GPollableOutputStream * stream,const void * buffer,gsize count,GError ** error)281 breaking_output_stream_write_nonblocking (GPollableOutputStream  *stream,
282 					  const void             *buffer,
283 					  gsize                   count,
284 					  GError                **error)
285 {
286 	if (((BreakingOutputStream *)stream)->is_broken) {
287 		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed");
288 		return -1;
289 	}
290 
291 	if (count > 128) {
292 		((BreakingOutputStream *)stream)->is_broken = TRUE;
293 		count /= 2;
294 	}
295 	return g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (G_FILTER_OUTPUT_STREAM (stream)->base_stream),
296 							   buffer, count, NULL, error);
297 }
298 
299 static GSource *
breaking_output_stream_create_source(GPollableOutputStream * stream,GCancellable * cancellable)300 breaking_output_stream_create_source (GPollableOutputStream *stream,
301 				      GCancellable *cancellable)
302 {
303 	GSource *base_source, *pollable_source;
304 
305 	base_source = g_timeout_source_new (0);
306 	g_source_set_dummy_callback (base_source);
307 
308 	pollable_source = g_pollable_source_new (G_OBJECT (stream));
309 	g_source_add_child_source (pollable_source, base_source);
310 	g_source_unref (base_source);
311 
312 	return pollable_source;
313 }
314 
315 static void
breaking_pollable_output_stream_init(GPollableOutputStreamInterface * pollable_interface,gpointer interface_data)316 breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
317 				  gpointer interface_data)
318 {
319 	pollable_interface->is_writable = breaking_output_stream_is_writable;
320 	pollable_interface->write_nonblocking = breaking_output_stream_write_nonblocking;
321 	pollable_interface->create_source = breaking_output_stream_create_source;
322 }
323 
324 #define CHUNK_SIZE 1024
325 
326 static GString *
chunkify(const char * str,gsize length)327 chunkify (const char *str, gsize length)
328 {
329 	GString *gstr;
330 	int i, size;
331 
332 	gstr = g_string_new (NULL);
333 	for (i = 0; i < length; i += CHUNK_SIZE) {
334 		size = MIN (CHUNK_SIZE, length - i);
335 		g_string_append_printf (gstr, "%x\r\n", size);
336 		g_string_append_len (gstr, str + i, size);
337 		g_string_append (gstr, "\r\n");
338 	}
339 	g_string_append (gstr, "0\r\n\r\n");
340 
341 	return gstr;
342 }
343 
344 static void
do_io_tests(void)345 do_io_tests (void)
346 {
347 	GInputStream *imem, *islow, *in;
348 	GOutputStream *omem, *oslow, *out;
349 	GMemoryOutputStream *mem;
350 	SoupBuffer *raw_contents;
351 	char *buf;
352 	GString *chunkified;
353 	GError *error = NULL;
354 	gssize nread, nwrote, total;
355 	gssize chunk_length, chunk_total;
356 
357 	raw_contents = soup_test_get_index ();
358 	chunkified = chunkify (raw_contents->data, raw_contents->length);
359 
360 	debug_printf (1, "  sync read\n");
361 
362 	imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL);
363 	islow = g_object_new (slow_input_stream_get_type (),
364 			      "base-stream", imem,
365 			      "close-base-stream", TRUE,
366 			      NULL);
367 	in = g_object_new (g_type_from_name ("SoupBodyInputStream"),
368 			   "base-stream", islow,
369 			   "close-base-stream", TRUE,
370 			   "encoding", SOUP_ENCODING_CHUNKED,
371 			   NULL);
372 	g_object_unref (imem);
373 	g_object_unref (islow);
374 
375 	buf = g_malloc (raw_contents->length);
376 	total = 0;
377 	while (TRUE) {
378 		nread = g_input_stream_read (in, buf + total,
379 					     raw_contents->length - total,
380 					     NULL, &error);
381 		g_assert_no_error (error);
382 		g_clear_error (&error);
383 		if (nread > 0)
384 			total += nread;
385 		else
386 			break;
387 	}
388 
389 	g_input_stream_close (in, NULL, &error);
390 	g_assert_no_error (error);
391 	g_clear_error (&error);
392 	g_object_unref (in);
393 
394 	soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length);
395 	g_free (buf);
396 
397 	debug_printf (1, "  async read\n");
398 
399 	imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL);
400 	islow = g_object_new (slow_input_stream_get_type (),
401 			      "base-stream", imem,
402 			      "close-base-stream", TRUE,
403 			      NULL);
404 	in = g_object_new (g_type_from_name ("SoupBodyInputStream"),
405 			   "base-stream", islow,
406 			   "close-base-stream", TRUE,
407 			   "encoding", SOUP_ENCODING_CHUNKED,
408 			   NULL);
409 	g_object_unref (imem);
410 	g_object_unref (islow);
411 
412 	buf = g_malloc (raw_contents->length);
413 	total = 0;
414 	while (TRUE) {
415 		nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (in),
416 								  buf + total,
417 								  raw_contents->length - total,
418 								  NULL, &error);
419 		if (nread == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
420 			GSource *source;
421 
422 			g_clear_error (&error);
423 			source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (in), NULL);
424 			g_source_set_dummy_callback (source);
425 			g_source_attach (source, NULL);
426 			while (!g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (in)))
427 				g_main_context_iteration (NULL, TRUE);
428 			g_source_destroy (source);
429 			g_source_unref (source);
430 			continue;
431 		} else if (nread == -1) {
432 			g_assert_no_error (error);
433 			g_clear_error (&error);
434 			break;
435 		} else if (nread == 0)
436 			break;
437 		else
438 			total += nread;
439 	}
440 
441 	g_input_stream_close (in, NULL, &error);
442 	g_assert_no_error (error);
443 	g_clear_error (&error);
444 	g_object_unref (in);
445 
446 	soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length);
447 	g_free (buf);
448 
449 	debug_printf (1, "  sync write\n");
450 
451 	buf = g_malloc (chunkified->len);
452 	omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
453 	oslow = g_object_new (slow_output_stream_get_type (),
454 			      "base-stream", omem,
455 			      "close-base-stream", TRUE,
456 			      NULL);
457 	out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
458 			    "base-stream", oslow,
459 			    "close-base-stream", TRUE,
460 			    "encoding", SOUP_ENCODING_CHUNKED,
461 			    NULL);
462 	g_object_unref (omem);
463 	g_object_unref (oslow);
464 
465 	total = chunk_length = chunk_total = 0;
466 	while (total < raw_contents->length) {
467 		if (chunk_total == chunk_length) {
468 			chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total);
469 			chunk_total = 0;
470 		}
471 		nwrote = g_output_stream_write (out, raw_contents->data + total,
472 						chunk_length - chunk_total, NULL, &error);
473 		g_assert_no_error (error);
474 		g_clear_error (&error);
475 		if (nwrote > 0) {
476 			total += nwrote;
477 			chunk_total += nwrote;
478 		} else
479 			break;
480 	}
481 
482 	g_output_stream_close (out, NULL, &error);
483 	g_assert_no_error (error);
484 	g_clear_error (&error);
485 
486 	mem = G_MEMORY_OUTPUT_STREAM (omem);
487 	soup_assert_cmpmem (g_memory_output_stream_get_data (mem),
488 			    g_memory_output_stream_get_data_size (mem),
489 			    chunkified->str, chunkified->len);
490 
491 	g_object_unref (out);
492 	g_free (buf);
493 
494 	debug_printf (1, "  async write\n");
495 
496 	buf = g_malloc (chunkified->len);
497 	omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
498 	oslow = g_object_new (slow_output_stream_get_type (),
499 			      "base-stream", omem,
500 			      "close-base-stream", TRUE,
501 			      NULL);
502 	out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
503 			    "base-stream", oslow,
504 			    "close-base-stream", TRUE,
505 			    "encoding", SOUP_ENCODING_CHUNKED,
506 			    NULL);
507 	g_object_unref (omem);
508 	g_object_unref (oslow);
509 
510 	total = chunk_length = chunk_total = 0;
511 	while (total < raw_contents->length) {
512 		if (chunk_total == chunk_length) {
513 			chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total);
514 			chunk_total = 0;
515 		}
516 		nwrote = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (out),
517 								     raw_contents->data + total,
518 								     chunk_length - chunk_total,
519 								     NULL, &error);
520 		if (nwrote == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
521 			GSource *source;
522 
523 			g_clear_error (&error);
524 			source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (out), NULL);
525 			g_source_set_dummy_callback (source);
526 			g_source_attach (source, NULL);
527 			while (!g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (out)))
528 				g_main_context_iteration (NULL, TRUE);
529 			g_source_destroy (source);
530 			g_source_unref (source);
531 			continue;
532 		} else if (nwrote == -1) {
533 			g_assert_no_error (error);
534 			g_clear_error (&error);
535 			break;
536 		} else {
537 			total += nwrote;
538 			chunk_total += nwrote;
539 		}
540 	}
541 
542 	g_output_stream_close (out, NULL, &error);
543 	g_assert_no_error (error);
544 	g_clear_error (&error);
545 
546 	mem = G_MEMORY_OUTPUT_STREAM (omem);
547 	soup_assert_cmpmem (g_memory_output_stream_get_data (mem),
548 			    g_memory_output_stream_get_data_size (mem),
549 			    chunkified->str, chunkified->len);
550 
551 	g_object_unref (out);
552 	g_free (buf);
553 
554 	debug_printf (1, "  failed write\n");
555 	/* this succeeds if it doesn't critical */
556 
557 	buf = g_malloc (chunkified->len);
558 	omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
559 	oslow = g_object_new (breaking_output_stream_get_type (),
560 			      "base-stream", omem,
561 			      "close-base-stream", TRUE,
562 			      NULL);
563 	out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
564 			    "base-stream", oslow,
565 			    "close-base-stream", TRUE,
566 			    "encoding", SOUP_ENCODING_CHUNKED,
567 			    NULL);
568 	g_object_unref (omem);
569 	g_object_unref (oslow);
570 
571 	total = 0;
572 	while (total < raw_contents->length) {
573 		nwrote = g_output_stream_write (out, raw_contents->data + total,
574 						raw_contents->length - total, NULL, NULL);
575 		if (nwrote == -1)
576 			break;
577 		else
578 			total += nwrote;
579 	}
580 
581 	g_assert_cmpint (total, !=, raw_contents->length);
582 
583 	g_output_stream_close (out, NULL, NULL);
584 	g_object_unref (out);
585 
586 	g_free (buf);
587 
588 	g_string_free (chunkified, TRUE);
589 }
590 
591 int
main(int argc,char ** argv)592 main (int argc, char **argv)
593 {
594 	int ret;
595 
596 	test_init (argc, argv, NULL);
597 
598 	force_io_streams_init ();
599 
600 	g_test_add_func ("/chunk-io", do_io_tests);
601 
602 	ret = g_test_run ();
603 
604 	test_cleanup ();
605 	return ret;
606 }
607