• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-message-io.c: HTTP message I/O
4  *
5  * Copyright (C) 2000-2003, Ximian, Inc.
6  */
7 
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11 
12 #include <glib/gi18n-lib.h>
13 
14 #include "soup.h"
15 #include "soup-body-input-stream.h"
16 #include "soup-body-output-stream.h"
17 #include "soup-client-input-stream.h"
18 #include "soup-connection.h"
19 #include "soup-content-processor.h"
20 #include "soup-content-sniffer-stream.h"
21 #include "soup-filter-input-stream.h"
22 #include "soup-message-private.h"
23 #include "soup-message-queue.h"
24 #include "soup-misc-private.h"
25 
26 typedef enum {
27 	SOUP_MESSAGE_IO_CLIENT,
28 	SOUP_MESSAGE_IO_SERVER
29 } SoupMessageIOMode;
30 
31 typedef enum {
32 	SOUP_MESSAGE_IO_STATE_NOT_STARTED,
33 	SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
34 	SOUP_MESSAGE_IO_STATE_HEADERS,
35 	SOUP_MESSAGE_IO_STATE_BLOCKING,
36 	SOUP_MESSAGE_IO_STATE_BODY_START,
37 	SOUP_MESSAGE_IO_STATE_BODY,
38 	SOUP_MESSAGE_IO_STATE_BODY_DATA,
39 	SOUP_MESSAGE_IO_STATE_BODY_FLUSH,
40 	SOUP_MESSAGE_IO_STATE_BODY_DONE,
41 	SOUP_MESSAGE_IO_STATE_FINISHING,
42 	SOUP_MESSAGE_IO_STATE_DONE
43 } SoupMessageIOState;
44 
45 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
46 	(state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
47 	 state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
48 	 state != SOUP_MESSAGE_IO_STATE_DONE)
49 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
50 	(SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
51 	 state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
52 
53 typedef struct {
54 	SoupMessageQueueItem *item;
55 	SoupMessageIOMode     mode;
56 	GCancellable         *cancellable;
57 
58 	GIOStream              *iostream;
59 	SoupFilterInputStream  *istream;
60 	GInputStream           *body_istream;
61 	GOutputStream          *ostream;
62 	GOutputStream          *body_ostream;
63 	GMainContext           *async_context;
64 
65 	SoupMessageIOState    read_state;
66 	SoupEncoding          read_encoding;
67 	GByteArray           *read_header_buf;
68 	SoupMessageBody      *read_body;
69 	goffset               read_length;
70 
71 	SoupMessageIOState    write_state;
72 	SoupEncoding          write_encoding;
73 	GString              *write_buf;
74 	SoupMessageBody      *write_body;
75 	SoupBuffer           *write_chunk;
76 	goffset               write_body_offset;
77 	goffset               write_length;
78 	goffset               written;
79 
80 	GSource *io_source;
81 	GSource *unpause_source;
82 	gboolean paused;
83 
84 	GCancellable *async_close_wait;
85 	GError       *async_close_error;
86 
87 	SoupMessageGetHeadersFn   get_headers_cb;
88 	SoupMessageParseHeadersFn parse_headers_cb;
89 	gpointer                  header_data;
90 	SoupMessageCompletionFn   completion_cb;
91 	gpointer                  completion_data;
92 } SoupMessageIOData;
93 
94 static void io_run (SoupMessage *msg, gboolean blocking);
95 
96 #define RESPONSE_BLOCK_SIZE 8192
97 #define HEADER_SIZE_LIMIT (64 * 1024)
98 
99 void
soup_message_io_cleanup(SoupMessage * msg)100 soup_message_io_cleanup (SoupMessage *msg)
101 {
102 	SoupMessageIOData *io;
103 
104 	soup_message_io_stop (msg);
105 
106 	io = soup_message_get_io_data (msg);
107 	if (!io)
108 		return;
109 	soup_message_set_io_data (msg, NULL);
110 
111 	if (io->iostream)
112 		g_object_unref (io->iostream);
113 	if (io->body_istream)
114 		g_object_unref (io->body_istream);
115 	if (io->body_ostream)
116 		g_object_unref (io->body_ostream);
117 	if (io->async_context)
118 		g_main_context_unref (io->async_context);
119 	if (io->item)
120 		soup_message_queue_item_unref (io->item);
121 
122 	g_byte_array_free (io->read_header_buf, TRUE);
123 
124 	g_string_free (io->write_buf, TRUE);
125 	if (io->write_chunk)
126 		soup_buffer_free (io->write_chunk);
127 
128 	if (io->async_close_wait) {
129 		g_cancellable_cancel (io->async_close_wait);
130 		g_clear_object (&io->async_close_wait);
131 	}
132 	g_clear_error (&io->async_close_error);
133 
134 	g_slice_free (SoupMessageIOData, io);
135 }
136 
137 void
soup_message_io_stop(SoupMessage * msg)138 soup_message_io_stop (SoupMessage *msg)
139 {
140 	SoupMessageIOData *io = soup_message_get_io_data (msg);
141 
142 	if (!io)
143 		return;
144 
145 	if (io->io_source) {
146 		g_source_destroy (io->io_source);
147 		g_source_unref (io->io_source);
148 		io->io_source = NULL;
149 	}
150 
151 	if (io->unpause_source) {
152 		g_source_destroy (io->unpause_source);
153 		g_source_unref (io->unpause_source);
154 		io->unpause_source = NULL;
155 	}
156 }
157 
158 void
soup_message_io_finished(SoupMessage * msg)159 soup_message_io_finished (SoupMessage *msg)
160 {
161 	SoupMessageIOData *io;
162 	SoupMessageCompletionFn completion_cb;
163 	gpointer completion_data;
164 	SoupMessageIOCompletion completion;
165 
166 	io = soup_message_get_io_data (msg);
167 	if (!io)
168 		return;
169 
170 	completion_cb = io->completion_cb;
171 	completion_data = io->completion_data;
172 
173 	if ((io->read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
174 	     io->write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
175 		completion = SOUP_MESSAGE_IO_COMPLETE;
176 	else
177 		completion = SOUP_MESSAGE_IO_INTERRUPTED;
178 
179 	g_object_ref (msg);
180 	soup_message_io_cleanup (msg);
181 	if (completion_cb)
182 		completion_cb (msg, completion, completion_data);
183 	g_object_unref (msg);
184 }
185 
186 GIOStream *
soup_message_io_steal(SoupMessage * msg)187 soup_message_io_steal (SoupMessage *msg)
188 {
189 	SoupMessageIOData *io;
190 	SoupMessageCompletionFn completion_cb;
191 	gpointer completion_data;
192 	GIOStream *iostream;
193 
194 	io = soup_message_get_io_data (msg);
195 	if (!io || !io->iostream)
196 		return NULL;
197 
198 	iostream = g_object_ref (io->iostream);
199 	completion_cb = io->completion_cb;
200 	completion_data = io->completion_data;
201 
202 	g_object_ref (msg);
203 	soup_message_io_cleanup (msg);
204 	if (completion_cb)
205 		completion_cb (msg, SOUP_MESSAGE_IO_STOLEN, completion_data);
206 	g_object_unref (msg);
207 
208 	return iostream;
209 }
210 
211 static gboolean
read_headers(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)212 read_headers (SoupMessage *msg, gboolean blocking,
213 	      GCancellable *cancellable, GError **error)
214 {
215 	SoupMessageIOData *io;
216 	gssize nread, old_len;
217 	gboolean got_lf;
218 
219 	io = soup_message_get_io_data (msg);
220 	while (1) {
221 		old_len = io->read_header_buf->len;
222 		g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
223 		nread = soup_filter_input_stream_read_line (io->istream,
224 							    io->read_header_buf->data + old_len,
225 							    RESPONSE_BLOCK_SIZE,
226 							    blocking,
227 							    &got_lf,
228 							    cancellable, error);
229 		io->read_header_buf->len = old_len + MAX (nread, 0);
230 		if (nread == 0) {
231 			if (io->read_header_buf->len > 0)
232 				break;
233 			soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
234 			g_set_error_literal (error, G_IO_ERROR,
235 					     G_IO_ERROR_PARTIAL_INPUT,
236 					     _("Connection terminated unexpectedly"));
237 		}
238 		if (nread <= 0)
239 			return FALSE;
240 
241 		if (got_lf) {
242 			if (nread == 1 && old_len >= 2 &&
243 			    !strncmp ((char *)io->read_header_buf->data +
244 				      io->read_header_buf->len - 2,
245 				      "\n\n", 2)) {
246 				io->read_header_buf->len--;
247 				break;
248 			} else if (nread == 2 && old_len >= 3 &&
249 				 !strncmp ((char *)io->read_header_buf->data +
250 					   io->read_header_buf->len - 3,
251 					   "\n\r\n", 3)) {
252 				io->read_header_buf->len -= 2;
253 				break;
254 			}
255 		}
256 
257 		if (io->read_header_buf->len > HEADER_SIZE_LIMIT) {
258 			soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
259 			g_set_error_literal (error, G_IO_ERROR,
260 					     G_IO_ERROR_PARTIAL_INPUT,
261 					     _("Header too big"));
262 			return FALSE;
263 		}
264 	}
265 
266 	io->read_header_buf->data[io->read_header_buf->len] = '\0';
267 	return TRUE;
268 }
269 
270 static gint
processing_stage_cmp(gconstpointer a,gconstpointer b)271 processing_stage_cmp (gconstpointer a,
272 		    gconstpointer b)
273 {
274 	SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (a));
275 	SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (b));
276 
277 	if (stage_a > stage_b)
278 		return 1;
279 	if (stage_a == stage_b)
280 		return 0;
281 	return -1;
282 }
283 
284 GInputStream *
soup_message_setup_body_istream(GInputStream * body_stream,SoupMessage * msg,SoupSession * session,SoupProcessingStage start_at_stage)285 soup_message_setup_body_istream (GInputStream *body_stream,
286 				 SoupMessage *msg,
287 				 SoupSession *session,
288 				 SoupProcessingStage start_at_stage)
289 {
290 	GInputStream *istream;
291 	GSList *p, *processors;
292 
293 	istream = g_object_ref (body_stream);
294 
295 	processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR);
296 	processors = g_slist_sort (processors, processing_stage_cmp);
297 
298 	for (p = processors; p; p = p->next) {
299 		GInputStream *wrapper;
300 		SoupContentProcessor *processor;
301 
302 		processor = SOUP_CONTENT_PROCESSOR (p->data);
303 		if (soup_message_disables_feature (msg, p->data) ||
304 		    soup_content_processor_get_processing_stage (processor) < start_at_stage)
305 			continue;
306 
307 		wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL);
308 		if (wrapper) {
309 			g_object_unref (istream);
310 			istream = wrapper;
311 		}
312 	}
313 
314 	g_slist_free (processors);
315 
316 	return istream;
317 }
318 
319 static void
closed_async(GObject * source,GAsyncResult * result,gpointer user_data)320 closed_async (GObject      *source,
321 	      GAsyncResult *result,
322 	      gpointer      user_data)
323 {
324 	GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
325 	SoupMessage *msg = user_data;
326 	SoupMessageIOData *io;
327 	GCancellable *async_close_wait;
328 
329 	io = soup_message_get_io_data (msg);
330 	if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
331 		g_object_unref (msg);
332 		return;
333 	}
334 
335 	g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
336 	g_clear_object (&io->body_ostream);
337 
338 	async_close_wait = io->async_close_wait;
339 	io->async_close_wait = NULL;
340 	g_cancellable_cancel (async_close_wait);
341 	g_object_unref (async_close_wait);
342 
343 	g_object_unref (msg);
344 }
345 
346 /*
347  * There are two request/response formats: the basic request/response,
348  * possibly with one or more unsolicited informational responses (such
349  * as the WebDAV "102 Processing" response):
350  *
351  *     Client                            Server
352  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
353  *      W:BODY     / R:NOT_STARTED    ->  R:BODY     / W:NOT_STARTED
354  *     [W:DONE     / R:HEADERS (1xx)  <-  R:DONE     / W:HEADERS (1xx) ...]
355  *      W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS
356  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
357  *      W:DONE     / R:DONE               R:DONE     / W:DONE
358  *
359  * and the "Expect: 100-continue" request/response, with the client
360  * blocking halfway through its request, and then either continuing or
361  * aborting, depending on the server response:
362  *
363  *     Client                            Server
364  *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
365  *      W:BLOCKING / R:HEADERS        <-  R:BLOCKING / W:HEADERS
366  *     [W:BODY     / R:BLOCKING       ->  R:BODY     / W:BLOCKING]
367  *     [W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS]
368  *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
369  *      W:DONE     / R:DONE               R:DONE     / W:DONE
370  */
371 
372 /* Attempts to push forward the writing side of @msg's I/O. Returns
373  * %TRUE if it manages to make some progress, and it is likely that
374  * further progress can be made. Returns %FALSE if it has reached a
375  * stopping point of some sort (need input from the application,
376  * socket not writable, write is complete, etc).
377  */
378 static gboolean
io_write(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)379 io_write (SoupMessage *msg, gboolean blocking,
380 	  GCancellable *cancellable, GError **error)
381 {
382 	SoupMessageIOData *io = soup_message_get_io_data (msg);
383 	SoupBuffer *chunk;
384 	gssize nwrote;
385 
386 	if (io->async_close_error) {
387 		g_propagate_error (error, io->async_close_error);
388 		io->async_close_error = NULL;
389 		return FALSE;
390 	} else if (io->async_close_wait) {
391 		g_set_error_literal (error, G_IO_ERROR,
392 				     G_IO_ERROR_WOULD_BLOCK,
393 				     _("Operation would block"));
394 		return FALSE;
395 	}
396 
397 	switch (io->write_state) {
398 	case SOUP_MESSAGE_IO_STATE_HEADERS:
399 		if (io->mode == SOUP_MESSAGE_IO_SERVER &&
400 		    io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING &&
401 		    msg->status_code == 0) {
402 			/* Client requested "Expect: 100-continue", and
403 			 * server did not set an error.
404 			 */
405 			soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
406 		}
407 
408 		if (!io->write_buf->len) {
409 			io->get_headers_cb (msg, io->write_buf,
410 					    &io->write_encoding,
411 					    io->header_data);
412 		}
413 
414 		while (io->written < io->write_buf->len) {
415 			nwrote = g_pollable_stream_write (io->ostream,
416 							  io->write_buf->str + io->written,
417 							  io->write_buf->len - io->written,
418 							  blocking,
419 							  cancellable, error);
420 			if (nwrote == -1)
421 				return FALSE;
422 			io->written += nwrote;
423 		}
424 
425 		io->written = 0;
426 		g_string_truncate (io->write_buf, 0);
427 
428 		if (io->mode == SOUP_MESSAGE_IO_SERVER &&
429 		    SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
430 			if (msg->status_code == SOUP_STATUS_CONTINUE) {
431 				/* Stop and wait for the body now */
432 				io->write_state =
433 					SOUP_MESSAGE_IO_STATE_BLOCKING;
434 				io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
435 			} else {
436 				/* We just wrote a 1xx response
437 				 * header, so stay in STATE_HEADERS.
438 				 * (The caller will pause us from the
439 				 * wrote_informational callback if he
440 				 * is not ready to send the final
441 				 * response.)
442 				 */
443 			}
444 
445 			soup_message_wrote_informational (msg);
446 
447 			/* If this was "101 Switching Protocols", then
448 			 * the server probably stole the connection...
449 			 */
450 			if (io != soup_message_get_io_data (msg))
451 				return FALSE;
452 
453 			soup_message_cleanup_response (msg);
454 			break;
455 		}
456 
457 		if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
458 			SoupMessageHeaders *hdrs =
459 				(io->mode == SOUP_MESSAGE_IO_CLIENT) ?
460 				msg->request_headers : msg->response_headers;
461 			io->write_length = soup_message_headers_get_content_length (hdrs);
462 		}
463 
464 		if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
465 		    soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
466 			/* Need to wait for the Continue response */
467 			io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
468 			io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
469 		} else {
470 			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
471 
472 			/* If the client was waiting for a Continue
473 			 * but we sent something else, then they're
474 			 * now done writing.
475 			 */
476 			if (io->mode == SOUP_MESSAGE_IO_SERVER &&
477 			    io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
478 				io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
479 		}
480 
481 		soup_message_wrote_headers (msg);
482 		break;
483 
484 
485 	case SOUP_MESSAGE_IO_STATE_BODY_START:
486 		io->body_ostream = soup_body_output_stream_new (io->ostream,
487 								io->write_encoding,
488 								io->write_length);
489 		io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
490 		break;
491 
492 
493 	case SOUP_MESSAGE_IO_STATE_BODY:
494 		if (!io->write_length &&
495 		    io->write_encoding != SOUP_ENCODING_EOF &&
496 		    io->write_encoding != SOUP_ENCODING_CHUNKED) {
497 			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
498 			break;
499 		}
500 
501 		if (!io->write_chunk) {
502 			io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
503 			if (!io->write_chunk) {
504 				g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
505 				soup_message_io_pause (msg);
506 				return FALSE;
507 			}
508 			if (!io->write_chunk->length) {
509 				io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
510 				break;
511 			}
512 		}
513 
514 		nwrote = g_pollable_stream_write (io->body_ostream,
515 						  io->write_chunk->data + io->written,
516 						  io->write_chunk->length - io->written,
517 						  blocking,
518 						  cancellable, error);
519 		if (nwrote == -1)
520 			return FALSE;
521 
522 		chunk = soup_buffer_new_subbuffer (io->write_chunk,
523 						   io->written, nwrote);
524 		io->written += nwrote;
525 		if (io->write_length)
526 			io->write_length -= nwrote;
527 
528 		if (io->written == io->write_chunk->length)
529 			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
530 
531 		soup_message_wrote_body_data (msg, chunk);
532 		soup_buffer_free (chunk);
533 		break;
534 
535 
536 	case SOUP_MESSAGE_IO_STATE_BODY_DATA:
537 		io->written = 0;
538 		if (io->write_chunk->length == 0) {
539 			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
540 			break;
541 		}
542 
543 		if (io->mode == SOUP_MESSAGE_IO_SERVER ||
544 		    soup_message_get_flags (msg) & SOUP_MESSAGE_CAN_REBUILD)
545 			soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
546 		io->write_body_offset += io->write_chunk->length;
547 		soup_buffer_free (io->write_chunk);
548 		io->write_chunk = NULL;
549 
550 		io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
551 		soup_message_wrote_chunk (msg);
552 		break;
553 
554 
555 	case SOUP_MESSAGE_IO_STATE_BODY_FLUSH:
556 		if (io->body_ostream) {
557 			if (blocking || io->write_encoding != SOUP_ENCODING_CHUNKED) {
558 				if (!g_output_stream_close (io->body_ostream, cancellable, error))
559 					return FALSE;
560 				g_clear_object (&io->body_ostream);
561 			} else {
562 				io->async_close_wait = g_cancellable_new ();
563 				if (io->async_context)
564 					g_main_context_push_thread_default (io->async_context);
565 				g_output_stream_close_async (io->body_ostream,
566 							     G_PRIORITY_DEFAULT, cancellable,
567 							     closed_async, g_object_ref (msg));
568 				if (io->async_context)
569 					g_main_context_pop_thread_default (io->async_context);
570 			}
571 		}
572 
573 		io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
574 		break;
575 
576 
577 	case SOUP_MESSAGE_IO_STATE_BODY_DONE:
578 		io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
579 		soup_message_wrote_body (msg);
580 		break;
581 
582 
583 	case SOUP_MESSAGE_IO_STATE_FINISHING:
584 		io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
585 
586 		if (io->mode == SOUP_MESSAGE_IO_CLIENT)
587 			io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
588 		break;
589 
590 
591 	default:
592 		g_return_val_if_reached (FALSE);
593 	}
594 
595 	return TRUE;
596 }
597 
598 /* Attempts to push forward the reading side of @msg's I/O. Returns
599  * %TRUE if it manages to make some progress, and it is likely that
600  * further progress can be made. Returns %FALSE if it has reached a
601  * stopping point of some sort (need input from the application,
602  * socket not readable, read is complete, etc).
603  */
604 static gboolean
io_read(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)605 io_read (SoupMessage *msg, gboolean blocking,
606 	 GCancellable *cancellable, GError **error)
607 {
608 	SoupMessageIOData *io = soup_message_get_io_data (msg);
609 	guchar *stack_buf = NULL;
610 	gssize nread;
611 	SoupBuffer *buffer;
612 	guint status;
613 
614 	switch (io->read_state) {
615 	case SOUP_MESSAGE_IO_STATE_HEADERS:
616 		if (!read_headers (msg, blocking, cancellable, error))
617 			return FALSE;
618 
619 		status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
620 					       io->read_header_buf->len,
621 					       &io->read_encoding,
622 					       io->header_data, error);
623 		g_byte_array_set_size (io->read_header_buf, 0);
624 
625 		if (status != SOUP_STATUS_OK) {
626 			/* Either we couldn't parse the headers, or they
627 			 * indicated something that would mean we wouldn't
628 			 * be able to parse the body. (Eg, unknown
629 			 * Transfer-Encoding.). Skip the rest of the
630 			 * reading, and make sure the connection gets
631 			 * closed when we're done.
632 			 */
633 			soup_message_set_status (msg, status);
634 			soup_message_headers_append (msg->request_headers,
635 						     "Connection", "close");
636 			io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
637 			break;
638 		}
639 
640 		if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
641 		    SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
642 			if (msg->status_code == SOUP_STATUS_CONTINUE &&
643 			    io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
644 				/* Pause the reader, unpause the writer */
645 				io->read_state =
646 					SOUP_MESSAGE_IO_STATE_BLOCKING;
647 				io->write_state =
648 					SOUP_MESSAGE_IO_STATE_BODY_START;
649 			} else {
650 				/* Just stay in HEADERS */
651 				io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
652 			}
653 
654 			/* Informational responses have no bodies, so
655 			 * bail out here rather than parsing encoding, etc
656 			 */
657 			soup_message_got_informational (msg);
658 
659 			/* If this was "101 Switching Protocols", then
660 			 * the session may have stolen the connection...
661 			 */
662 			if (io != soup_message_get_io_data (msg))
663 				return FALSE;
664 
665 			soup_message_cleanup_response (msg);
666 			break;
667 		} else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
668 			   soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
669 			/* We must return a status code and response
670 			 * headers to the client; either an error to
671 			 * be set by a got-headers handler below, or
672 			 * else %SOUP_STATUS_CONTINUE otherwise.
673 			 */
674 			io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
675 			io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
676 		} else {
677 			io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
678 
679 			/* If the client was waiting for a Continue
680 			 * but got something else, then it's done
681 			 * writing.
682 			 */
683 			if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
684 			    io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
685 				io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
686 		}
687 
688 		if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
689 			SoupMessageHeaders *hdrs =
690 				(io->mode == SOUP_MESSAGE_IO_CLIENT) ?
691 				msg->response_headers : msg->request_headers;
692 			io->read_length = soup_message_headers_get_content_length (hdrs);
693 
694 			if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
695 			    !soup_message_is_keepalive (msg)) {
696 				/* Some servers suck and send
697 				 * incorrect Content-Length values, so
698 				 * allow EOF termination in this case
699 				 * (iff the message is too short) too.
700 				 */
701 				io->read_encoding = SOUP_ENCODING_EOF;
702 			}
703 		} else
704 			io->read_length = -1;
705 
706 		soup_message_got_headers (msg);
707 		break;
708 
709 
710 	case SOUP_MESSAGE_IO_STATE_BODY_START:
711 		if (!io->body_istream) {
712 			GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
713 										 io->read_encoding,
714 										 io->read_length);
715 
716 			/* TODO: server-side messages do not have a io->item. This means
717 			 * that we cannot use content processors for them right now.
718 			 */
719 			if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
720 				io->body_istream = soup_message_setup_body_istream (body_istream, msg,
721 										    io->item->session,
722 										    SOUP_STAGE_MESSAGE_BODY);
723 				g_object_unref (body_istream);
724 			} else {
725 				io->body_istream = body_istream;
726 			}
727 		}
728 
729 		if (soup_message_get_content_sniffer (msg)) {
730 			SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
731 			const char *content_type;
732 			GHashTable *params;
733 
734 			if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
735 								   cancellable, error))
736 				return FALSE;
737 
738 			content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
739 			soup_message_content_sniffed (msg, content_type, params);
740 		}
741 
742 		io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
743 		break;
744 
745 
746 	case SOUP_MESSAGE_IO_STATE_BODY:
747 		if (soup_message_has_chunk_allocator (msg)) {
748 			buffer = soup_message_allocate_chunk (msg, io->read_length);
749 			if (!buffer) {
750 				g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
751 				soup_message_io_pause (msg);
752 				return FALSE;
753 			}
754 		} else {
755 			if (!stack_buf)
756 				stack_buf = alloca (RESPONSE_BLOCK_SIZE);
757 			buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
758 						  stack_buf,
759 						  RESPONSE_BLOCK_SIZE);
760 		}
761 
762 		nread = g_pollable_stream_read (io->body_istream,
763 						(guchar *)buffer->data,
764 						buffer->length,
765 						blocking,
766 						cancellable, error);
767 		if (nread > 0) {
768 			buffer->length = nread;
769 			soup_message_body_got_chunk (io->read_body, buffer);
770 			soup_message_got_chunk (msg, buffer);
771 			soup_buffer_free (buffer);
772 			break;
773 		}
774 
775 		soup_buffer_free (buffer);
776 		if (nread == -1)
777 			return FALSE;
778 
779 		/* else nread == 0 */
780 		io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
781 		break;
782 
783 
784 	case SOUP_MESSAGE_IO_STATE_BODY_DONE:
785 		io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
786 		soup_message_got_body (msg);
787 		break;
788 
789 
790 	case SOUP_MESSAGE_IO_STATE_FINISHING:
791 		io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
792 
793 		if (io->mode == SOUP_MESSAGE_IO_SERVER)
794 			io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
795 		break;
796 
797 
798 	default:
799 		g_return_val_if_reached (FALSE);
800 	}
801 
802 	return TRUE;
803 }
804 
805 typedef struct {
806 	GSource source;
807 	SoupMessage *msg;
808 	gboolean paused;
809 } SoupMessageSource;
810 
811 static gboolean
message_source_check(GSource * source)812 message_source_check (GSource *source)
813 {
814 	SoupMessageSource *message_source = (SoupMessageSource *)source;
815 
816 	if (message_source->paused) {
817 		SoupMessageIOData *io = soup_message_get_io_data (message_source->msg);
818 
819 		if (io && io->paused)
820 			return FALSE;
821 		else
822 			return TRUE;
823 	} else
824 		return FALSE;
825 }
826 
827 static gboolean
message_source_prepare(GSource * source,gint * timeout)828 message_source_prepare (GSource *source,
829 			gint    *timeout)
830 {
831 	*timeout = -1;
832 	return message_source_check (source);
833 }
834 
835 static gboolean
message_source_dispatch(GSource * source,GSourceFunc callback,gpointer user_data)836 message_source_dispatch (GSource     *source,
837 			 GSourceFunc  callback,
838 			 gpointer     user_data)
839 {
840 	SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
841 	SoupMessageSource *message_source = (SoupMessageSource *)source;
842 
843 	return (*func) (message_source->msg, user_data);
844 }
845 
846 static void
message_source_finalize(GSource * source)847 message_source_finalize (GSource *source)
848 {
849 	SoupMessageSource *message_source = (SoupMessageSource *)source;
850 
851 	g_object_unref (message_source->msg);
852 }
853 
854 static gboolean
message_source_closure_callback(SoupMessage * msg,gpointer data)855 message_source_closure_callback (SoupMessage *msg,
856 				 gpointer     data)
857 {
858 	GClosure *closure = data;
859 	GValue param = G_VALUE_INIT;
860 	GValue result_value = G_VALUE_INIT;
861 	gboolean result;
862 
863 	g_value_init (&result_value, G_TYPE_BOOLEAN);
864 
865 	g_value_init (&param, SOUP_TYPE_MESSAGE);
866 	g_value_set_object (&param, msg);
867 
868 	g_closure_invoke (closure, &result_value, 1, &param, NULL);
869 
870 	result = g_value_get_boolean (&result_value);
871 	g_value_unset (&result_value);
872 	g_value_unset (&param);
873 
874 	return result;
875 }
876 
877 static GSourceFuncs message_source_funcs =
878 {
879 	message_source_prepare,
880 	message_source_check,
881 	message_source_dispatch,
882 	message_source_finalize,
883 	(GSourceFunc)message_source_closure_callback,
884 	(GSourceDummyMarshal)g_cclosure_marshal_generic,
885 };
886 
887 GSource *
soup_message_io_get_source(SoupMessage * msg,GCancellable * cancellable,SoupMessageSourceFunc callback,gpointer user_data)888 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
889 			    SoupMessageSourceFunc callback, gpointer user_data)
890 {
891 	SoupMessageIOData *io = soup_message_get_io_data (msg);
892 	GSource *base_source, *source;
893 	SoupMessageSource *message_source;
894 
895 	if (!io) {
896 		base_source = g_timeout_source_new (0);
897 	} else if (io->paused) {
898 		base_source = NULL;
899 	} else if (io->async_close_wait) {
900 		base_source = g_cancellable_source_new (io->async_close_wait);
901 	} else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
902 		GPollableInputStream *istream;
903 
904 		if (io->body_istream)
905 			istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
906 		else
907 			istream = G_POLLABLE_INPUT_STREAM (io->istream);
908 		base_source = g_pollable_input_stream_create_source (istream, cancellable);
909 	} else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
910 		GPollableOutputStream *ostream;
911 
912 		if (io->body_ostream)
913 			ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
914 		else
915 			ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
916 		base_source = g_pollable_output_stream_create_source (ostream, cancellable);
917 	} else
918 		base_source = g_timeout_source_new (0);
919 
920 	source = g_source_new (&message_source_funcs,
921 			       sizeof (SoupMessageSource));
922 	g_source_set_name (source, "SoupMessageSource");
923 	message_source = (SoupMessageSource *)source;
924 	message_source->msg = g_object_ref (msg);
925 	message_source->paused = io && io->paused;
926 
927 	if (base_source) {
928 		g_source_set_dummy_callback (base_source);
929 		g_source_add_child_source (source, base_source);
930 		g_source_unref (base_source);
931 	}
932 	g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
933 	return source;
934 }
935 
936 static gboolean
request_is_restartable(SoupMessage * msg,GError * error)937 request_is_restartable (SoupMessage *msg, GError *error)
938 {
939 	SoupMessageIOData *io = soup_message_get_io_data (msg);
940 
941 	if (!io)
942 		return FALSE;
943 
944 	return (io->mode == SOUP_MESSAGE_IO_CLIENT &&
945 		io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
946 		io->read_header_buf->len == 0 &&
947 		soup_connection_get_ever_used (io->item->conn) &&
948 		!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
949 		!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
950 		error->domain != G_TLS_ERROR &&
951 		SOUP_METHOD_IS_IDEMPOTENT (msg->method));
952 }
953 
954 static gboolean
io_run_until(SoupMessage * msg,gboolean blocking,SoupMessageIOState read_state,SoupMessageIOState write_state,GCancellable * cancellable,GError ** error)955 io_run_until (SoupMessage *msg, gboolean blocking,
956 	      SoupMessageIOState read_state, SoupMessageIOState write_state,
957 	      GCancellable *cancellable, GError **error)
958 {
959 	SoupMessageIOData *io = soup_message_get_io_data (msg);
960 	gboolean progress = TRUE, done;
961 	GError *my_error = NULL;
962 
963 	if (g_cancellable_set_error_if_cancelled (cancellable, error))
964 		return FALSE;
965 	else if (!io) {
966 		g_set_error_literal (error, G_IO_ERROR,
967 				     G_IO_ERROR_CANCELLED,
968 				     _("Operation was cancelled"));
969 		return FALSE;
970 	}
971 
972 	g_object_ref (msg);
973 
974 	while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_close_wait &&
975 	       (io->read_state < read_state || io->write_state < write_state)) {
976 
977 		if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
978 			progress = io_read (msg, blocking, cancellable, &my_error);
979 		else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
980 			progress = io_write (msg, blocking, cancellable, &my_error);
981 		else
982 			progress = FALSE;
983 	}
984 
985 	if (my_error) {
986 		if (request_is_restartable (msg, my_error)) {
987 			/* Connection got closed, but we can safely try again */
988 			g_error_free (my_error);
989 			g_set_error_literal (error, SOUP_HTTP_ERROR,
990 					     SOUP_STATUS_TRY_AGAIN, "");
991 			g_object_unref (msg);
992 			return FALSE;
993 		}
994 
995 		g_propagate_error (error, my_error);
996 		g_object_unref (msg);
997 		return FALSE;
998 	} else if (soup_message_get_io_data (msg) != io) {
999 		g_set_error_literal (error, G_IO_ERROR,
1000 				     G_IO_ERROR_CANCELLED,
1001 				     _("Operation was cancelled"));
1002 		g_object_unref (msg);
1003 		return FALSE;
1004 	} else if (!io->async_close_wait &&
1005 		   g_cancellable_set_error_if_cancelled (cancellable, error)) {
1006 		g_object_unref (msg);
1007 		return FALSE;
1008 	}
1009 
1010 	done = (io->read_state >= read_state &&
1011 		io->write_state >= write_state);
1012 
1013 	if (!blocking && !done) {
1014 		g_set_error_literal (error, G_IO_ERROR,
1015 				     G_IO_ERROR_WOULD_BLOCK,
1016 				     _("Operation would block"));
1017 		g_object_unref (msg);
1018 		return FALSE;
1019 	}
1020 
1021 	g_object_unref (msg);
1022 	return done;
1023 }
1024 
1025 static gboolean
io_run_ready(SoupMessage * msg,gpointer user_data)1026 io_run_ready (SoupMessage *msg, gpointer user_data)
1027 {
1028 	io_run (msg, FALSE);
1029 	return FALSE;
1030 }
1031 
1032 static void
io_run(SoupMessage * msg,gboolean blocking)1033 io_run (SoupMessage *msg, gboolean blocking)
1034 {
1035 	SoupMessageIOData *io = soup_message_get_io_data (msg);
1036 	GError *error = NULL;
1037 	GCancellable *cancellable;
1038 
1039 	if (io->io_source) {
1040 		g_source_destroy (io->io_source);
1041 		g_source_unref (io->io_source);
1042 		io->io_source = NULL;
1043 	}
1044 
1045 	g_object_ref (msg);
1046 	cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
1047 
1048 	if (io_run_until (msg, blocking,
1049 			  SOUP_MESSAGE_IO_STATE_DONE,
1050 			  SOUP_MESSAGE_IO_STATE_DONE,
1051 			  cancellable, &error)) {
1052 		soup_message_io_finished (msg);
1053 	} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1054 		g_clear_error (&error);
1055 		io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
1056 		g_source_attach (io->io_source, io->async_context);
1057 	} else if (error && soup_message_get_io_data (msg) == io) {
1058 		if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
1059 			io->item->state = SOUP_MESSAGE_RESTARTING;
1060 		else if (error->domain == G_TLS_ERROR) {
1061 			soup_message_set_status_full (msg,
1062 						      SOUP_STATUS_SSL_FAILED,
1063 						      error->message);
1064 		} else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
1065 			soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
1066 
1067 		g_error_free (error);
1068 		soup_message_io_finished (msg);
1069 	} else if (error)
1070 		g_error_free (error);
1071 
1072 	g_object_unref (msg);
1073 	g_clear_object (&cancellable);
1074 }
1075 
1076 gboolean
soup_message_io_run_until_write(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)1077 soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
1078 				 GCancellable *cancellable, GError **error)
1079 {
1080 	return io_run_until (msg, blocking,
1081 			     SOUP_MESSAGE_IO_STATE_ANY,
1082 			     SOUP_MESSAGE_IO_STATE_BODY,
1083 			     cancellable, error);
1084 }
1085 
1086 gboolean
soup_message_io_run_until_read(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)1087 soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
1088 				GCancellable *cancellable, GError **error)
1089 {
1090 	return io_run_until (msg, blocking,
1091 			     SOUP_MESSAGE_IO_STATE_BODY,
1092 			     SOUP_MESSAGE_IO_STATE_ANY,
1093 			     cancellable, error);
1094 }
1095 
1096 gboolean
soup_message_io_run_until_finish(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)1097 soup_message_io_run_until_finish (SoupMessage   *msg,
1098 				  gboolean       blocking,
1099 				  GCancellable  *cancellable,
1100 				  GError       **error)
1101 {
1102 	SoupMessageIOData *io = soup_message_get_io_data (msg);
1103 	gboolean success;
1104 
1105 	g_object_ref (msg);
1106 
1107 	if (io) {
1108 		g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, FALSE);
1109 
1110 		if (io->read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
1111 			io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
1112 	}
1113 
1114 	success = io_run_until (msg, blocking,
1115 				SOUP_MESSAGE_IO_STATE_DONE,
1116 				SOUP_MESSAGE_IO_STATE_DONE,
1117 				cancellable, error);
1118 
1119 	g_object_unref (msg);
1120 	return success;
1121 }
1122 
1123 static void
client_stream_eof(SoupClientInputStream * stream,gpointer user_data)1124 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
1125 {
1126 	SoupMessage *msg = user_data;
1127 	SoupMessageIOData *io = soup_message_get_io_data (msg);
1128 
1129 	if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
1130 		io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
1131 }
1132 
1133 GInputStream *
soup_message_io_get_response_istream(SoupMessage * msg,GError ** error)1134 soup_message_io_get_response_istream (SoupMessage  *msg,
1135 				      GError      **error)
1136 {
1137 	SoupMessageIOData *io = soup_message_get_io_data (msg);
1138 	GInputStream *client_stream;
1139 
1140 	g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
1141 
1142 	if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
1143 		g_set_error_literal (error, SOUP_HTTP_ERROR,
1144 				     msg->status_code, msg->reason_phrase);
1145 		return NULL;
1146 	}
1147 
1148 	client_stream = soup_client_input_stream_new (io->body_istream, msg);
1149 	g_signal_connect (client_stream, "eof",
1150 			  G_CALLBACK (client_stream_eof), msg);
1151 
1152 	return client_stream;
1153 }
1154 
1155 
1156 static SoupMessageIOData *
new_iostate(SoupMessage * msg,GIOStream * iostream,GMainContext * async_context,SoupMessageIOMode mode,SoupMessageGetHeadersFn get_headers_cb,SoupMessageParseHeadersFn parse_headers_cb,gpointer header_data,SoupMessageCompletionFn completion_cb,gpointer completion_data)1157 new_iostate (SoupMessage *msg, GIOStream *iostream,
1158 	     GMainContext *async_context, SoupMessageIOMode mode,
1159 	     SoupMessageGetHeadersFn get_headers_cb,
1160 	     SoupMessageParseHeadersFn parse_headers_cb,
1161 	     gpointer header_data,
1162 	     SoupMessageCompletionFn completion_cb,
1163 	     gpointer completion_data)
1164 {
1165 	SoupMessageIOData *io;
1166 
1167 	io = g_slice_new0 (SoupMessageIOData);
1168 	io->mode = mode;
1169 	io->get_headers_cb   = get_headers_cb;
1170 	io->parse_headers_cb = parse_headers_cb;
1171 	io->header_data      = header_data;
1172 	io->completion_cb    = completion_cb;
1173 	io->completion_data  = completion_data;
1174 
1175 	io->iostream = g_object_ref (iostream);
1176 	io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
1177 	io->ostream = g_io_stream_get_output_stream (iostream);
1178 
1179 	if (async_context)
1180 		io->async_context = g_main_context_ref (async_context);
1181 
1182 	io->read_header_buf = g_byte_array_new ();
1183 	io->write_buf       = g_string_new (NULL);
1184 
1185 	io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1186 	io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1187 
1188 	if (soup_message_get_io_data (msg))
1189 		soup_message_io_cleanup (msg);
1190 	soup_message_set_io_data (msg, io);
1191 	return io;
1192 }
1193 
1194 void
soup_message_io_client(SoupMessageQueueItem * item,GIOStream * iostream,GMainContext * async_context,SoupMessageGetHeadersFn get_headers_cb,SoupMessageParseHeadersFn parse_headers_cb,gpointer header_data,SoupMessageCompletionFn completion_cb,gpointer completion_data)1195 soup_message_io_client (SoupMessageQueueItem *item,
1196 			GIOStream *iostream,
1197 			GMainContext *async_context,
1198 			SoupMessageGetHeadersFn get_headers_cb,
1199 			SoupMessageParseHeadersFn parse_headers_cb,
1200 			gpointer header_data,
1201 			SoupMessageCompletionFn completion_cb,
1202 			gpointer completion_data)
1203 {
1204 	SoupMessageIOData *io;
1205 
1206 	io = new_iostate (item->msg, iostream, async_context,
1207 			  SOUP_MESSAGE_IO_CLIENT,
1208 			  get_headers_cb, parse_headers_cb, header_data,
1209 			  completion_cb, completion_data);
1210 
1211 	io->item = item;
1212 	soup_message_queue_item_ref (item);
1213 	io->cancellable = item->cancellable;
1214 
1215 	io->read_body       = item->msg->response_body;
1216 	io->write_body      = item->msg->request_body;
1217 
1218 	io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
1219 
1220 	if (!item->new_api) {
1221 		gboolean blocking =
1222 			SOUP_IS_SESSION_SYNC (item->session) ||
1223 			(!SOUP_IS_SESSION_ASYNC (item->session) && !item->async);
1224 		io_run (item->msg, blocking);
1225 	}
1226 }
1227 
1228 void
soup_message_io_server(SoupMessage * msg,GIOStream * iostream,GMainContext * async_context,SoupMessageGetHeadersFn get_headers_cb,SoupMessageParseHeadersFn parse_headers_cb,gpointer header_data,SoupMessageCompletionFn completion_cb,gpointer completion_data)1229 soup_message_io_server (SoupMessage *msg,
1230 			GIOStream *iostream, GMainContext *async_context,
1231 			SoupMessageGetHeadersFn get_headers_cb,
1232 			SoupMessageParseHeadersFn parse_headers_cb,
1233 			gpointer header_data,
1234 			SoupMessageCompletionFn completion_cb,
1235 			gpointer completion_data)
1236 {
1237 	SoupMessageIOData *io;
1238 
1239 	io = new_iostate (msg, iostream, async_context,
1240 			  SOUP_MESSAGE_IO_SERVER,
1241 			  get_headers_cb, parse_headers_cb, header_data,
1242 			  completion_cb, completion_data);
1243 
1244 	io->read_body       = msg->request_body;
1245 	io->write_body      = msg->response_body;
1246 
1247 	io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
1248 	io_run (msg, FALSE);
1249 }
1250 
1251 void
soup_message_io_pause(SoupMessage * msg)1252 soup_message_io_pause (SoupMessage *msg)
1253 {
1254 	SoupMessageIOData *io = soup_message_get_io_data (msg);
1255 
1256 	g_return_if_fail (io != NULL);
1257 
1258 	if (io->item && io->item->new_api)
1259 		g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1260 
1261 	if (io->io_source) {
1262 		g_source_destroy (io->io_source);
1263 		g_source_unref (io->io_source);
1264 		io->io_source = NULL;
1265 	}
1266 
1267 	if (io->unpause_source) {
1268 		g_source_destroy (io->unpause_source);
1269 		g_source_unref (io->unpause_source);
1270 		io->unpause_source = NULL;
1271 	}
1272 
1273 	io->paused = TRUE;
1274 }
1275 
1276 static gboolean
io_unpause_internal(gpointer msg)1277 io_unpause_internal (gpointer msg)
1278 {
1279 	SoupMessageIOData *io = soup_message_get_io_data (msg);
1280 
1281 	g_return_val_if_fail (io != NULL, FALSE);
1282 
1283 	g_clear_pointer (&io->unpause_source, g_source_unref);
1284 	io->paused = FALSE;
1285 
1286 	if (io->io_source)
1287 		return FALSE;
1288 
1289 	io_run (msg, FALSE);
1290 	return FALSE;
1291 }
1292 
1293 void
soup_message_io_unpause(SoupMessage * msg)1294 soup_message_io_unpause (SoupMessage *msg)
1295 {
1296 	SoupMessageIOData *io = soup_message_get_io_data (msg);
1297 
1298 	g_return_if_fail (io != NULL);
1299 
1300 	if (io->item && io->item->new_api) {
1301 		g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1302 		io->paused = FALSE;
1303 		return;
1304 	}
1305 
1306 	if (!io->unpause_source) {
1307 		io->unpause_source = soup_add_completion_reffed (io->async_context,
1308 								 io_unpause_internal, msg, NULL);
1309 	}
1310 }
1311 
1312 /**
1313  * soup_message_io_in_progress:
1314  * @msg: a #SoupMessage
1315  *
1316  * Tests whether or not I/O is currently in progress on @msg.
1317  *
1318  * Return value: whether or not I/O is currently in progress.
1319  **/
1320 gboolean
soup_message_io_in_progress(SoupMessage * msg)1321 soup_message_io_in_progress (SoupMessage *msg)
1322 {
1323 	return soup_message_get_io_data (msg) != NULL;
1324 }
1325