1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3 * soup-message-io.c: HTTP message I/O
4 *
5 * Copyright (C) 2000-2003, Ximian, Inc.
6 */
7
8 #ifdef HAVE_CONFIG_H
9 #include "config.h"
10 #endif
11
12 #include <glib/gi18n-lib.h>
13
14 #include "soup.h"
15 #include "soup-body-input-stream.h"
16 #include "soup-body-output-stream.h"
17 #include "soup-client-input-stream.h"
18 #include "soup-connection.h"
19 #include "soup-content-processor.h"
20 #include "soup-content-sniffer-stream.h"
21 #include "soup-filter-input-stream.h"
22 #include "soup-message-private.h"
23 #include "soup-message-queue.h"
24 #include "soup-misc-private.h"
25
26 typedef enum {
27 SOUP_MESSAGE_IO_CLIENT,
28 SOUP_MESSAGE_IO_SERVER
29 } SoupMessageIOMode;
30
31 typedef enum {
32 SOUP_MESSAGE_IO_STATE_NOT_STARTED,
33 SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
34 SOUP_MESSAGE_IO_STATE_HEADERS,
35 SOUP_MESSAGE_IO_STATE_BLOCKING,
36 SOUP_MESSAGE_IO_STATE_BODY_START,
37 SOUP_MESSAGE_IO_STATE_BODY,
38 SOUP_MESSAGE_IO_STATE_BODY_DATA,
39 SOUP_MESSAGE_IO_STATE_BODY_FLUSH,
40 SOUP_MESSAGE_IO_STATE_BODY_DONE,
41 SOUP_MESSAGE_IO_STATE_FINISHING,
42 SOUP_MESSAGE_IO_STATE_DONE
43 } SoupMessageIOState;
44
45 #define SOUP_MESSAGE_IO_STATE_ACTIVE(state) \
46 (state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
47 state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
48 state != SOUP_MESSAGE_IO_STATE_DONE)
49 #define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
50 (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
51 state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
52
53 typedef struct {
54 SoupMessageQueueItem *item;
55 SoupMessageIOMode mode;
56 GCancellable *cancellable;
57
58 GIOStream *iostream;
59 SoupFilterInputStream *istream;
60 GInputStream *body_istream;
61 GOutputStream *ostream;
62 GOutputStream *body_ostream;
63 GMainContext *async_context;
64
65 SoupMessageIOState read_state;
66 SoupEncoding read_encoding;
67 GByteArray *read_header_buf;
68 SoupMessageBody *read_body;
69 goffset read_length;
70
71 SoupMessageIOState write_state;
72 SoupEncoding write_encoding;
73 GString *write_buf;
74 SoupMessageBody *write_body;
75 SoupBuffer *write_chunk;
76 goffset write_body_offset;
77 goffset write_length;
78 goffset written;
79
80 GSource *io_source;
81 GSource *unpause_source;
82 gboolean paused;
83
84 GCancellable *async_close_wait;
85 GError *async_close_error;
86
87 SoupMessageGetHeadersFn get_headers_cb;
88 SoupMessageParseHeadersFn parse_headers_cb;
89 gpointer header_data;
90 SoupMessageCompletionFn completion_cb;
91 gpointer completion_data;
92 } SoupMessageIOData;
93
94 static void io_run (SoupMessage *msg, gboolean blocking);
95
96 #define RESPONSE_BLOCK_SIZE 8192
97 #define HEADER_SIZE_LIMIT (64 * 1024)
98
99 void
soup_message_io_cleanup(SoupMessage * msg)100 soup_message_io_cleanup (SoupMessage *msg)
101 {
102 SoupMessageIOData *io;
103
104 soup_message_io_stop (msg);
105
106 io = soup_message_get_io_data (msg);
107 if (!io)
108 return;
109 soup_message_set_io_data (msg, NULL);
110
111 if (io->iostream)
112 g_object_unref (io->iostream);
113 if (io->body_istream)
114 g_object_unref (io->body_istream);
115 if (io->body_ostream)
116 g_object_unref (io->body_ostream);
117 if (io->async_context)
118 g_main_context_unref (io->async_context);
119 if (io->item)
120 soup_message_queue_item_unref (io->item);
121
122 g_byte_array_free (io->read_header_buf, TRUE);
123
124 g_string_free (io->write_buf, TRUE);
125 if (io->write_chunk)
126 soup_buffer_free (io->write_chunk);
127
128 if (io->async_close_wait) {
129 g_cancellable_cancel (io->async_close_wait);
130 g_clear_object (&io->async_close_wait);
131 }
132 g_clear_error (&io->async_close_error);
133
134 g_slice_free (SoupMessageIOData, io);
135 }
136
137 void
soup_message_io_stop(SoupMessage * msg)138 soup_message_io_stop (SoupMessage *msg)
139 {
140 SoupMessageIOData *io = soup_message_get_io_data (msg);
141
142 if (!io)
143 return;
144
145 if (io->io_source) {
146 g_source_destroy (io->io_source);
147 g_source_unref (io->io_source);
148 io->io_source = NULL;
149 }
150
151 if (io->unpause_source) {
152 g_source_destroy (io->unpause_source);
153 g_source_unref (io->unpause_source);
154 io->unpause_source = NULL;
155 }
156 }
157
158 void
soup_message_io_finished(SoupMessage * msg)159 soup_message_io_finished (SoupMessage *msg)
160 {
161 SoupMessageIOData *io;
162 SoupMessageCompletionFn completion_cb;
163 gpointer completion_data;
164 SoupMessageIOCompletion completion;
165
166 io = soup_message_get_io_data (msg);
167 if (!io)
168 return;
169
170 completion_cb = io->completion_cb;
171 completion_data = io->completion_data;
172
173 if ((io->read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
174 io->write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
175 completion = SOUP_MESSAGE_IO_COMPLETE;
176 else
177 completion = SOUP_MESSAGE_IO_INTERRUPTED;
178
179 g_object_ref (msg);
180 soup_message_io_cleanup (msg);
181 if (completion_cb)
182 completion_cb (msg, completion, completion_data);
183 g_object_unref (msg);
184 }
185
186 GIOStream *
soup_message_io_steal(SoupMessage * msg)187 soup_message_io_steal (SoupMessage *msg)
188 {
189 SoupMessageIOData *io;
190 SoupMessageCompletionFn completion_cb;
191 gpointer completion_data;
192 GIOStream *iostream;
193
194 io = soup_message_get_io_data (msg);
195 if (!io || !io->iostream)
196 return NULL;
197
198 iostream = g_object_ref (io->iostream);
199 completion_cb = io->completion_cb;
200 completion_data = io->completion_data;
201
202 g_object_ref (msg);
203 soup_message_io_cleanup (msg);
204 if (completion_cb)
205 completion_cb (msg, SOUP_MESSAGE_IO_STOLEN, completion_data);
206 g_object_unref (msg);
207
208 return iostream;
209 }
210
211 static gboolean
read_headers(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)212 read_headers (SoupMessage *msg, gboolean blocking,
213 GCancellable *cancellable, GError **error)
214 {
215 SoupMessageIOData *io;
216 gssize nread, old_len;
217 gboolean got_lf;
218
219 io = soup_message_get_io_data (msg);
220 while (1) {
221 old_len = io->read_header_buf->len;
222 g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
223 nread = soup_filter_input_stream_read_line (io->istream,
224 io->read_header_buf->data + old_len,
225 RESPONSE_BLOCK_SIZE,
226 blocking,
227 &got_lf,
228 cancellable, error);
229 io->read_header_buf->len = old_len + MAX (nread, 0);
230 if (nread == 0) {
231 if (io->read_header_buf->len > 0)
232 break;
233 soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
234 g_set_error_literal (error, G_IO_ERROR,
235 G_IO_ERROR_PARTIAL_INPUT,
236 _("Connection terminated unexpectedly"));
237 }
238 if (nread <= 0)
239 return FALSE;
240
241 if (got_lf) {
242 if (nread == 1 && old_len >= 2 &&
243 !strncmp ((char *)io->read_header_buf->data +
244 io->read_header_buf->len - 2,
245 "\n\n", 2)) {
246 io->read_header_buf->len--;
247 break;
248 } else if (nread == 2 && old_len >= 3 &&
249 !strncmp ((char *)io->read_header_buf->data +
250 io->read_header_buf->len - 3,
251 "\n\r\n", 3)) {
252 io->read_header_buf->len -= 2;
253 break;
254 }
255 }
256
257 if (io->read_header_buf->len > HEADER_SIZE_LIMIT) {
258 soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
259 g_set_error_literal (error, G_IO_ERROR,
260 G_IO_ERROR_PARTIAL_INPUT,
261 _("Header too big"));
262 return FALSE;
263 }
264 }
265
266 io->read_header_buf->data[io->read_header_buf->len] = '\0';
267 return TRUE;
268 }
269
270 static gint
processing_stage_cmp(gconstpointer a,gconstpointer b)271 processing_stage_cmp (gconstpointer a,
272 gconstpointer b)
273 {
274 SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (a));
275 SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR (b));
276
277 if (stage_a > stage_b)
278 return 1;
279 if (stage_a == stage_b)
280 return 0;
281 return -1;
282 }
283
284 GInputStream *
soup_message_setup_body_istream(GInputStream * body_stream,SoupMessage * msg,SoupSession * session,SoupProcessingStage start_at_stage)285 soup_message_setup_body_istream (GInputStream *body_stream,
286 SoupMessage *msg,
287 SoupSession *session,
288 SoupProcessingStage start_at_stage)
289 {
290 GInputStream *istream;
291 GSList *p, *processors;
292
293 istream = g_object_ref (body_stream);
294
295 processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR);
296 processors = g_slist_sort (processors, processing_stage_cmp);
297
298 for (p = processors; p; p = p->next) {
299 GInputStream *wrapper;
300 SoupContentProcessor *processor;
301
302 processor = SOUP_CONTENT_PROCESSOR (p->data);
303 if (soup_message_disables_feature (msg, p->data) ||
304 soup_content_processor_get_processing_stage (processor) < start_at_stage)
305 continue;
306
307 wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL);
308 if (wrapper) {
309 g_object_unref (istream);
310 istream = wrapper;
311 }
312 }
313
314 g_slist_free (processors);
315
316 return istream;
317 }
318
319 static void
closed_async(GObject * source,GAsyncResult * result,gpointer user_data)320 closed_async (GObject *source,
321 GAsyncResult *result,
322 gpointer user_data)
323 {
324 GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
325 SoupMessage *msg = user_data;
326 SoupMessageIOData *io;
327 GCancellable *async_close_wait;
328
329 io = soup_message_get_io_data (msg);
330 if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
331 g_object_unref (msg);
332 return;
333 }
334
335 g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
336 g_clear_object (&io->body_ostream);
337
338 async_close_wait = io->async_close_wait;
339 io->async_close_wait = NULL;
340 g_cancellable_cancel (async_close_wait);
341 g_object_unref (async_close_wait);
342
343 g_object_unref (msg);
344 }
345
346 /*
347 * There are two request/response formats: the basic request/response,
348 * possibly with one or more unsolicited informational responses (such
349 * as the WebDAV "102 Processing" response):
350 *
351 * Client Server
352 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
353 * W:BODY / R:NOT_STARTED -> R:BODY / W:NOT_STARTED
354 * [W:DONE / R:HEADERS (1xx) <- R:DONE / W:HEADERS (1xx) ...]
355 * W:DONE / R:HEADERS <- R:DONE / W:HEADERS
356 * W:DONE / R:BODY <- R:DONE / W:BODY
357 * W:DONE / R:DONE R:DONE / W:DONE
358 *
359 * and the "Expect: 100-continue" request/response, with the client
360 * blocking halfway through its request, and then either continuing or
361 * aborting, depending on the server response:
362 *
363 * Client Server
364 * W:HEADERS / R:NOT_STARTED -> R:HEADERS / W:NOT_STARTED
365 * W:BLOCKING / R:HEADERS <- R:BLOCKING / W:HEADERS
366 * [W:BODY / R:BLOCKING -> R:BODY / W:BLOCKING]
367 * [W:DONE / R:HEADERS <- R:DONE / W:HEADERS]
368 * W:DONE / R:BODY <- R:DONE / W:BODY
369 * W:DONE / R:DONE R:DONE / W:DONE
370 */
371
372 /* Attempts to push forward the writing side of @msg's I/O. Returns
373 * %TRUE if it manages to make some progress, and it is likely that
374 * further progress can be made. Returns %FALSE if it has reached a
375 * stopping point of some sort (need input from the application,
376 * socket not writable, write is complete, etc).
377 */
378 static gboolean
io_write(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)379 io_write (SoupMessage *msg, gboolean blocking,
380 GCancellable *cancellable, GError **error)
381 {
382 SoupMessageIOData *io = soup_message_get_io_data (msg);
383 SoupBuffer *chunk;
384 gssize nwrote;
385
386 if (io->async_close_error) {
387 g_propagate_error (error, io->async_close_error);
388 io->async_close_error = NULL;
389 return FALSE;
390 } else if (io->async_close_wait) {
391 g_set_error_literal (error, G_IO_ERROR,
392 G_IO_ERROR_WOULD_BLOCK,
393 _("Operation would block"));
394 return FALSE;
395 }
396
397 switch (io->write_state) {
398 case SOUP_MESSAGE_IO_STATE_HEADERS:
399 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
400 io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING &&
401 msg->status_code == 0) {
402 /* Client requested "Expect: 100-continue", and
403 * server did not set an error.
404 */
405 soup_message_set_status (msg, SOUP_STATUS_CONTINUE);
406 }
407
408 if (!io->write_buf->len) {
409 io->get_headers_cb (msg, io->write_buf,
410 &io->write_encoding,
411 io->header_data);
412 }
413
414 while (io->written < io->write_buf->len) {
415 nwrote = g_pollable_stream_write (io->ostream,
416 io->write_buf->str + io->written,
417 io->write_buf->len - io->written,
418 blocking,
419 cancellable, error);
420 if (nwrote == -1)
421 return FALSE;
422 io->written += nwrote;
423 }
424
425 io->written = 0;
426 g_string_truncate (io->write_buf, 0);
427
428 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
429 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
430 if (msg->status_code == SOUP_STATUS_CONTINUE) {
431 /* Stop and wait for the body now */
432 io->write_state =
433 SOUP_MESSAGE_IO_STATE_BLOCKING;
434 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
435 } else {
436 /* We just wrote a 1xx response
437 * header, so stay in STATE_HEADERS.
438 * (The caller will pause us from the
439 * wrote_informational callback if he
440 * is not ready to send the final
441 * response.)
442 */
443 }
444
445 soup_message_wrote_informational (msg);
446
447 /* If this was "101 Switching Protocols", then
448 * the server probably stole the connection...
449 */
450 if (io != soup_message_get_io_data (msg))
451 return FALSE;
452
453 soup_message_cleanup_response (msg);
454 break;
455 }
456
457 if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
458 SoupMessageHeaders *hdrs =
459 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
460 msg->request_headers : msg->response_headers;
461 io->write_length = soup_message_headers_get_content_length (hdrs);
462 }
463
464 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
465 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
466 /* Need to wait for the Continue response */
467 io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
468 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
469 } else {
470 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
471
472 /* If the client was waiting for a Continue
473 * but we sent something else, then they're
474 * now done writing.
475 */
476 if (io->mode == SOUP_MESSAGE_IO_SERVER &&
477 io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
478 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
479 }
480
481 soup_message_wrote_headers (msg);
482 break;
483
484
485 case SOUP_MESSAGE_IO_STATE_BODY_START:
486 io->body_ostream = soup_body_output_stream_new (io->ostream,
487 io->write_encoding,
488 io->write_length);
489 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
490 break;
491
492
493 case SOUP_MESSAGE_IO_STATE_BODY:
494 if (!io->write_length &&
495 io->write_encoding != SOUP_ENCODING_EOF &&
496 io->write_encoding != SOUP_ENCODING_CHUNKED) {
497 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
498 break;
499 }
500
501 if (!io->write_chunk) {
502 io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
503 if (!io->write_chunk) {
504 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
505 soup_message_io_pause (msg);
506 return FALSE;
507 }
508 if (!io->write_chunk->length) {
509 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
510 break;
511 }
512 }
513
514 nwrote = g_pollable_stream_write (io->body_ostream,
515 io->write_chunk->data + io->written,
516 io->write_chunk->length - io->written,
517 blocking,
518 cancellable, error);
519 if (nwrote == -1)
520 return FALSE;
521
522 chunk = soup_buffer_new_subbuffer (io->write_chunk,
523 io->written, nwrote);
524 io->written += nwrote;
525 if (io->write_length)
526 io->write_length -= nwrote;
527
528 if (io->written == io->write_chunk->length)
529 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
530
531 soup_message_wrote_body_data (msg, chunk);
532 soup_buffer_free (chunk);
533 break;
534
535
536 case SOUP_MESSAGE_IO_STATE_BODY_DATA:
537 io->written = 0;
538 if (io->write_chunk->length == 0) {
539 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
540 break;
541 }
542
543 if (io->mode == SOUP_MESSAGE_IO_SERVER ||
544 soup_message_get_flags (msg) & SOUP_MESSAGE_CAN_REBUILD)
545 soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
546 io->write_body_offset += io->write_chunk->length;
547 soup_buffer_free (io->write_chunk);
548 io->write_chunk = NULL;
549
550 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
551 soup_message_wrote_chunk (msg);
552 break;
553
554
555 case SOUP_MESSAGE_IO_STATE_BODY_FLUSH:
556 if (io->body_ostream) {
557 if (blocking || io->write_encoding != SOUP_ENCODING_CHUNKED) {
558 if (!g_output_stream_close (io->body_ostream, cancellable, error))
559 return FALSE;
560 g_clear_object (&io->body_ostream);
561 } else {
562 io->async_close_wait = g_cancellable_new ();
563 if (io->async_context)
564 g_main_context_push_thread_default (io->async_context);
565 g_output_stream_close_async (io->body_ostream,
566 G_PRIORITY_DEFAULT, cancellable,
567 closed_async, g_object_ref (msg));
568 if (io->async_context)
569 g_main_context_pop_thread_default (io->async_context);
570 }
571 }
572
573 io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
574 break;
575
576
577 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
578 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
579 soup_message_wrote_body (msg);
580 break;
581
582
583 case SOUP_MESSAGE_IO_STATE_FINISHING:
584 io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
585
586 if (io->mode == SOUP_MESSAGE_IO_CLIENT)
587 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
588 break;
589
590
591 default:
592 g_return_val_if_reached (FALSE);
593 }
594
595 return TRUE;
596 }
597
598 /* Attempts to push forward the reading side of @msg's I/O. Returns
599 * %TRUE if it manages to make some progress, and it is likely that
600 * further progress can be made. Returns %FALSE if it has reached a
601 * stopping point of some sort (need input from the application,
602 * socket not readable, read is complete, etc).
603 */
604 static gboolean
io_read(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)605 io_read (SoupMessage *msg, gboolean blocking,
606 GCancellable *cancellable, GError **error)
607 {
608 SoupMessageIOData *io = soup_message_get_io_data (msg);
609 guchar *stack_buf = NULL;
610 gssize nread;
611 SoupBuffer *buffer;
612 guint status;
613
614 switch (io->read_state) {
615 case SOUP_MESSAGE_IO_STATE_HEADERS:
616 if (!read_headers (msg, blocking, cancellable, error))
617 return FALSE;
618
619 status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
620 io->read_header_buf->len,
621 &io->read_encoding,
622 io->header_data, error);
623 g_byte_array_set_size (io->read_header_buf, 0);
624
625 if (status != SOUP_STATUS_OK) {
626 /* Either we couldn't parse the headers, or they
627 * indicated something that would mean we wouldn't
628 * be able to parse the body. (Eg, unknown
629 * Transfer-Encoding.). Skip the rest of the
630 * reading, and make sure the connection gets
631 * closed when we're done.
632 */
633 soup_message_set_status (msg, status);
634 soup_message_headers_append (msg->request_headers,
635 "Connection", "close");
636 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
637 break;
638 }
639
640 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
641 SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
642 if (msg->status_code == SOUP_STATUS_CONTINUE &&
643 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
644 /* Pause the reader, unpause the writer */
645 io->read_state =
646 SOUP_MESSAGE_IO_STATE_BLOCKING;
647 io->write_state =
648 SOUP_MESSAGE_IO_STATE_BODY_START;
649 } else {
650 /* Just stay in HEADERS */
651 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
652 }
653
654 /* Informational responses have no bodies, so
655 * bail out here rather than parsing encoding, etc
656 */
657 soup_message_got_informational (msg);
658
659 /* If this was "101 Switching Protocols", then
660 * the session may have stolen the connection...
661 */
662 if (io != soup_message_get_io_data (msg))
663 return FALSE;
664
665 soup_message_cleanup_response (msg);
666 break;
667 } else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
668 soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
669 /* We must return a status code and response
670 * headers to the client; either an error to
671 * be set by a got-headers handler below, or
672 * else %SOUP_STATUS_CONTINUE otherwise.
673 */
674 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
675 io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
676 } else {
677 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
678
679 /* If the client was waiting for a Continue
680 * but got something else, then it's done
681 * writing.
682 */
683 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
684 io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
685 io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
686 }
687
688 if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
689 SoupMessageHeaders *hdrs =
690 (io->mode == SOUP_MESSAGE_IO_CLIENT) ?
691 msg->response_headers : msg->request_headers;
692 io->read_length = soup_message_headers_get_content_length (hdrs);
693
694 if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
695 !soup_message_is_keepalive (msg)) {
696 /* Some servers suck and send
697 * incorrect Content-Length values, so
698 * allow EOF termination in this case
699 * (iff the message is too short) too.
700 */
701 io->read_encoding = SOUP_ENCODING_EOF;
702 }
703 } else
704 io->read_length = -1;
705
706 soup_message_got_headers (msg);
707 break;
708
709
710 case SOUP_MESSAGE_IO_STATE_BODY_START:
711 if (!io->body_istream) {
712 GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
713 io->read_encoding,
714 io->read_length);
715
716 /* TODO: server-side messages do not have a io->item. This means
717 * that we cannot use content processors for them right now.
718 */
719 if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
720 io->body_istream = soup_message_setup_body_istream (body_istream, msg,
721 io->item->session,
722 SOUP_STAGE_MESSAGE_BODY);
723 g_object_unref (body_istream);
724 } else {
725 io->body_istream = body_istream;
726 }
727 }
728
729 if (soup_message_get_content_sniffer (msg)) {
730 SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
731 const char *content_type;
732 GHashTable *params;
733
734 if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
735 cancellable, error))
736 return FALSE;
737
738 content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms);
739 soup_message_content_sniffed (msg, content_type, params);
740 }
741
742 io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
743 break;
744
745
746 case SOUP_MESSAGE_IO_STATE_BODY:
747 if (soup_message_has_chunk_allocator (msg)) {
748 buffer = soup_message_allocate_chunk (msg, io->read_length);
749 if (!buffer) {
750 g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
751 soup_message_io_pause (msg);
752 return FALSE;
753 }
754 } else {
755 if (!stack_buf)
756 stack_buf = alloca (RESPONSE_BLOCK_SIZE);
757 buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
758 stack_buf,
759 RESPONSE_BLOCK_SIZE);
760 }
761
762 nread = g_pollable_stream_read (io->body_istream,
763 (guchar *)buffer->data,
764 buffer->length,
765 blocking,
766 cancellable, error);
767 if (nread > 0) {
768 buffer->length = nread;
769 soup_message_body_got_chunk (io->read_body, buffer);
770 soup_message_got_chunk (msg, buffer);
771 soup_buffer_free (buffer);
772 break;
773 }
774
775 soup_buffer_free (buffer);
776 if (nread == -1)
777 return FALSE;
778
779 /* else nread == 0 */
780 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
781 break;
782
783
784 case SOUP_MESSAGE_IO_STATE_BODY_DONE:
785 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
786 soup_message_got_body (msg);
787 break;
788
789
790 case SOUP_MESSAGE_IO_STATE_FINISHING:
791 io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
792
793 if (io->mode == SOUP_MESSAGE_IO_SERVER)
794 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
795 break;
796
797
798 default:
799 g_return_val_if_reached (FALSE);
800 }
801
802 return TRUE;
803 }
804
805 typedef struct {
806 GSource source;
807 SoupMessage *msg;
808 gboolean paused;
809 } SoupMessageSource;
810
811 static gboolean
message_source_check(GSource * source)812 message_source_check (GSource *source)
813 {
814 SoupMessageSource *message_source = (SoupMessageSource *)source;
815
816 if (message_source->paused) {
817 SoupMessageIOData *io = soup_message_get_io_data (message_source->msg);
818
819 if (io && io->paused)
820 return FALSE;
821 else
822 return TRUE;
823 } else
824 return FALSE;
825 }
826
827 static gboolean
message_source_prepare(GSource * source,gint * timeout)828 message_source_prepare (GSource *source,
829 gint *timeout)
830 {
831 *timeout = -1;
832 return message_source_check (source);
833 }
834
835 static gboolean
message_source_dispatch(GSource * source,GSourceFunc callback,gpointer user_data)836 message_source_dispatch (GSource *source,
837 GSourceFunc callback,
838 gpointer user_data)
839 {
840 SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
841 SoupMessageSource *message_source = (SoupMessageSource *)source;
842
843 return (*func) (message_source->msg, user_data);
844 }
845
846 static void
message_source_finalize(GSource * source)847 message_source_finalize (GSource *source)
848 {
849 SoupMessageSource *message_source = (SoupMessageSource *)source;
850
851 g_object_unref (message_source->msg);
852 }
853
854 static gboolean
message_source_closure_callback(SoupMessage * msg,gpointer data)855 message_source_closure_callback (SoupMessage *msg,
856 gpointer data)
857 {
858 GClosure *closure = data;
859 GValue param = G_VALUE_INIT;
860 GValue result_value = G_VALUE_INIT;
861 gboolean result;
862
863 g_value_init (&result_value, G_TYPE_BOOLEAN);
864
865 g_value_init (¶m, SOUP_TYPE_MESSAGE);
866 g_value_set_object (¶m, msg);
867
868 g_closure_invoke (closure, &result_value, 1, ¶m, NULL);
869
870 result = g_value_get_boolean (&result_value);
871 g_value_unset (&result_value);
872 g_value_unset (¶m);
873
874 return result;
875 }
876
877 static GSourceFuncs message_source_funcs =
878 {
879 message_source_prepare,
880 message_source_check,
881 message_source_dispatch,
882 message_source_finalize,
883 (GSourceFunc)message_source_closure_callback,
884 (GSourceDummyMarshal)g_cclosure_marshal_generic,
885 };
886
887 GSource *
soup_message_io_get_source(SoupMessage * msg,GCancellable * cancellable,SoupMessageSourceFunc callback,gpointer user_data)888 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
889 SoupMessageSourceFunc callback, gpointer user_data)
890 {
891 SoupMessageIOData *io = soup_message_get_io_data (msg);
892 GSource *base_source, *source;
893 SoupMessageSource *message_source;
894
895 if (!io) {
896 base_source = g_timeout_source_new (0);
897 } else if (io->paused) {
898 base_source = NULL;
899 } else if (io->async_close_wait) {
900 base_source = g_cancellable_source_new (io->async_close_wait);
901 } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
902 GPollableInputStream *istream;
903
904 if (io->body_istream)
905 istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
906 else
907 istream = G_POLLABLE_INPUT_STREAM (io->istream);
908 base_source = g_pollable_input_stream_create_source (istream, cancellable);
909 } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
910 GPollableOutputStream *ostream;
911
912 if (io->body_ostream)
913 ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
914 else
915 ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
916 base_source = g_pollable_output_stream_create_source (ostream, cancellable);
917 } else
918 base_source = g_timeout_source_new (0);
919
920 source = g_source_new (&message_source_funcs,
921 sizeof (SoupMessageSource));
922 g_source_set_name (source, "SoupMessageSource");
923 message_source = (SoupMessageSource *)source;
924 message_source->msg = g_object_ref (msg);
925 message_source->paused = io && io->paused;
926
927 if (base_source) {
928 g_source_set_dummy_callback (base_source);
929 g_source_add_child_source (source, base_source);
930 g_source_unref (base_source);
931 }
932 g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
933 return source;
934 }
935
936 static gboolean
request_is_restartable(SoupMessage * msg,GError * error)937 request_is_restartable (SoupMessage *msg, GError *error)
938 {
939 SoupMessageIOData *io = soup_message_get_io_data (msg);
940
941 if (!io)
942 return FALSE;
943
944 return (io->mode == SOUP_MESSAGE_IO_CLIENT &&
945 io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
946 io->read_header_buf->len == 0 &&
947 soup_connection_get_ever_used (io->item->conn) &&
948 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
949 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
950 error->domain != G_TLS_ERROR &&
951 SOUP_METHOD_IS_IDEMPOTENT (msg->method));
952 }
953
954 static gboolean
io_run_until(SoupMessage * msg,gboolean blocking,SoupMessageIOState read_state,SoupMessageIOState write_state,GCancellable * cancellable,GError ** error)955 io_run_until (SoupMessage *msg, gboolean blocking,
956 SoupMessageIOState read_state, SoupMessageIOState write_state,
957 GCancellable *cancellable, GError **error)
958 {
959 SoupMessageIOData *io = soup_message_get_io_data (msg);
960 gboolean progress = TRUE, done;
961 GError *my_error = NULL;
962
963 if (g_cancellable_set_error_if_cancelled (cancellable, error))
964 return FALSE;
965 else if (!io) {
966 g_set_error_literal (error, G_IO_ERROR,
967 G_IO_ERROR_CANCELLED,
968 _("Operation was cancelled"));
969 return FALSE;
970 }
971
972 g_object_ref (msg);
973
974 while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_close_wait &&
975 (io->read_state < read_state || io->write_state < write_state)) {
976
977 if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
978 progress = io_read (msg, blocking, cancellable, &my_error);
979 else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
980 progress = io_write (msg, blocking, cancellable, &my_error);
981 else
982 progress = FALSE;
983 }
984
985 if (my_error) {
986 if (request_is_restartable (msg, my_error)) {
987 /* Connection got closed, but we can safely try again */
988 g_error_free (my_error);
989 g_set_error_literal (error, SOUP_HTTP_ERROR,
990 SOUP_STATUS_TRY_AGAIN, "");
991 g_object_unref (msg);
992 return FALSE;
993 }
994
995 g_propagate_error (error, my_error);
996 g_object_unref (msg);
997 return FALSE;
998 } else if (soup_message_get_io_data (msg) != io) {
999 g_set_error_literal (error, G_IO_ERROR,
1000 G_IO_ERROR_CANCELLED,
1001 _("Operation was cancelled"));
1002 g_object_unref (msg);
1003 return FALSE;
1004 } else if (!io->async_close_wait &&
1005 g_cancellable_set_error_if_cancelled (cancellable, error)) {
1006 g_object_unref (msg);
1007 return FALSE;
1008 }
1009
1010 done = (io->read_state >= read_state &&
1011 io->write_state >= write_state);
1012
1013 if (!blocking && !done) {
1014 g_set_error_literal (error, G_IO_ERROR,
1015 G_IO_ERROR_WOULD_BLOCK,
1016 _("Operation would block"));
1017 g_object_unref (msg);
1018 return FALSE;
1019 }
1020
1021 g_object_unref (msg);
1022 return done;
1023 }
1024
1025 static gboolean
io_run_ready(SoupMessage * msg,gpointer user_data)1026 io_run_ready (SoupMessage *msg, gpointer user_data)
1027 {
1028 io_run (msg, FALSE);
1029 return FALSE;
1030 }
1031
1032 static void
io_run(SoupMessage * msg,gboolean blocking)1033 io_run (SoupMessage *msg, gboolean blocking)
1034 {
1035 SoupMessageIOData *io = soup_message_get_io_data (msg);
1036 GError *error = NULL;
1037 GCancellable *cancellable;
1038
1039 if (io->io_source) {
1040 g_source_destroy (io->io_source);
1041 g_source_unref (io->io_source);
1042 io->io_source = NULL;
1043 }
1044
1045 g_object_ref (msg);
1046 cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
1047
1048 if (io_run_until (msg, blocking,
1049 SOUP_MESSAGE_IO_STATE_DONE,
1050 SOUP_MESSAGE_IO_STATE_DONE,
1051 cancellable, &error)) {
1052 soup_message_io_finished (msg);
1053 } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
1054 g_clear_error (&error);
1055 io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
1056 g_source_attach (io->io_source, io->async_context);
1057 } else if (error && soup_message_get_io_data (msg) == io) {
1058 if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
1059 io->item->state = SOUP_MESSAGE_RESTARTING;
1060 else if (error->domain == G_TLS_ERROR) {
1061 soup_message_set_status_full (msg,
1062 SOUP_STATUS_SSL_FAILED,
1063 error->message);
1064 } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
1065 soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
1066
1067 g_error_free (error);
1068 soup_message_io_finished (msg);
1069 } else if (error)
1070 g_error_free (error);
1071
1072 g_object_unref (msg);
1073 g_clear_object (&cancellable);
1074 }
1075
1076 gboolean
soup_message_io_run_until_write(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)1077 soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
1078 GCancellable *cancellable, GError **error)
1079 {
1080 return io_run_until (msg, blocking,
1081 SOUP_MESSAGE_IO_STATE_ANY,
1082 SOUP_MESSAGE_IO_STATE_BODY,
1083 cancellable, error);
1084 }
1085
1086 gboolean
soup_message_io_run_until_read(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)1087 soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
1088 GCancellable *cancellable, GError **error)
1089 {
1090 return io_run_until (msg, blocking,
1091 SOUP_MESSAGE_IO_STATE_BODY,
1092 SOUP_MESSAGE_IO_STATE_ANY,
1093 cancellable, error);
1094 }
1095
1096 gboolean
soup_message_io_run_until_finish(SoupMessage * msg,gboolean blocking,GCancellable * cancellable,GError ** error)1097 soup_message_io_run_until_finish (SoupMessage *msg,
1098 gboolean blocking,
1099 GCancellable *cancellable,
1100 GError **error)
1101 {
1102 SoupMessageIOData *io = soup_message_get_io_data (msg);
1103 gboolean success;
1104
1105 g_object_ref (msg);
1106
1107 if (io) {
1108 g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, FALSE);
1109
1110 if (io->read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
1111 io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
1112 }
1113
1114 success = io_run_until (msg, blocking,
1115 SOUP_MESSAGE_IO_STATE_DONE,
1116 SOUP_MESSAGE_IO_STATE_DONE,
1117 cancellable, error);
1118
1119 g_object_unref (msg);
1120 return success;
1121 }
1122
1123 static void
client_stream_eof(SoupClientInputStream * stream,gpointer user_data)1124 client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
1125 {
1126 SoupMessage *msg = user_data;
1127 SoupMessageIOData *io = soup_message_get_io_data (msg);
1128
1129 if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
1130 io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
1131 }
1132
1133 GInputStream *
soup_message_io_get_response_istream(SoupMessage * msg,GError ** error)1134 soup_message_io_get_response_istream (SoupMessage *msg,
1135 GError **error)
1136 {
1137 SoupMessageIOData *io = soup_message_get_io_data (msg);
1138 GInputStream *client_stream;
1139
1140 g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
1141
1142 if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
1143 g_set_error_literal (error, SOUP_HTTP_ERROR,
1144 msg->status_code, msg->reason_phrase);
1145 return NULL;
1146 }
1147
1148 client_stream = soup_client_input_stream_new (io->body_istream, msg);
1149 g_signal_connect (client_stream, "eof",
1150 G_CALLBACK (client_stream_eof), msg);
1151
1152 return client_stream;
1153 }
1154
1155
1156 static SoupMessageIOData *
new_iostate(SoupMessage * msg,GIOStream * iostream,GMainContext * async_context,SoupMessageIOMode mode,SoupMessageGetHeadersFn get_headers_cb,SoupMessageParseHeadersFn parse_headers_cb,gpointer header_data,SoupMessageCompletionFn completion_cb,gpointer completion_data)1157 new_iostate (SoupMessage *msg, GIOStream *iostream,
1158 GMainContext *async_context, SoupMessageIOMode mode,
1159 SoupMessageGetHeadersFn get_headers_cb,
1160 SoupMessageParseHeadersFn parse_headers_cb,
1161 gpointer header_data,
1162 SoupMessageCompletionFn completion_cb,
1163 gpointer completion_data)
1164 {
1165 SoupMessageIOData *io;
1166
1167 io = g_slice_new0 (SoupMessageIOData);
1168 io->mode = mode;
1169 io->get_headers_cb = get_headers_cb;
1170 io->parse_headers_cb = parse_headers_cb;
1171 io->header_data = header_data;
1172 io->completion_cb = completion_cb;
1173 io->completion_data = completion_data;
1174
1175 io->iostream = g_object_ref (iostream);
1176 io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
1177 io->ostream = g_io_stream_get_output_stream (iostream);
1178
1179 if (async_context)
1180 io->async_context = g_main_context_ref (async_context);
1181
1182 io->read_header_buf = g_byte_array_new ();
1183 io->write_buf = g_string_new (NULL);
1184
1185 io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1186 io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
1187
1188 if (soup_message_get_io_data (msg))
1189 soup_message_io_cleanup (msg);
1190 soup_message_set_io_data (msg, io);
1191 return io;
1192 }
1193
1194 void
soup_message_io_client(SoupMessageQueueItem * item,GIOStream * iostream,GMainContext * async_context,SoupMessageGetHeadersFn get_headers_cb,SoupMessageParseHeadersFn parse_headers_cb,gpointer header_data,SoupMessageCompletionFn completion_cb,gpointer completion_data)1195 soup_message_io_client (SoupMessageQueueItem *item,
1196 GIOStream *iostream,
1197 GMainContext *async_context,
1198 SoupMessageGetHeadersFn get_headers_cb,
1199 SoupMessageParseHeadersFn parse_headers_cb,
1200 gpointer header_data,
1201 SoupMessageCompletionFn completion_cb,
1202 gpointer completion_data)
1203 {
1204 SoupMessageIOData *io;
1205
1206 io = new_iostate (item->msg, iostream, async_context,
1207 SOUP_MESSAGE_IO_CLIENT,
1208 get_headers_cb, parse_headers_cb, header_data,
1209 completion_cb, completion_data);
1210
1211 io->item = item;
1212 soup_message_queue_item_ref (item);
1213 io->cancellable = item->cancellable;
1214
1215 io->read_body = item->msg->response_body;
1216 io->write_body = item->msg->request_body;
1217
1218 io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
1219
1220 if (!item->new_api) {
1221 gboolean blocking =
1222 SOUP_IS_SESSION_SYNC (item->session) ||
1223 (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async);
1224 io_run (item->msg, blocking);
1225 }
1226 }
1227
1228 void
soup_message_io_server(SoupMessage * msg,GIOStream * iostream,GMainContext * async_context,SoupMessageGetHeadersFn get_headers_cb,SoupMessageParseHeadersFn parse_headers_cb,gpointer header_data,SoupMessageCompletionFn completion_cb,gpointer completion_data)1229 soup_message_io_server (SoupMessage *msg,
1230 GIOStream *iostream, GMainContext *async_context,
1231 SoupMessageGetHeadersFn get_headers_cb,
1232 SoupMessageParseHeadersFn parse_headers_cb,
1233 gpointer header_data,
1234 SoupMessageCompletionFn completion_cb,
1235 gpointer completion_data)
1236 {
1237 SoupMessageIOData *io;
1238
1239 io = new_iostate (msg, iostream, async_context,
1240 SOUP_MESSAGE_IO_SERVER,
1241 get_headers_cb, parse_headers_cb, header_data,
1242 completion_cb, completion_data);
1243
1244 io->read_body = msg->request_body;
1245 io->write_body = msg->response_body;
1246
1247 io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
1248 io_run (msg, FALSE);
1249 }
1250
1251 void
soup_message_io_pause(SoupMessage * msg)1252 soup_message_io_pause (SoupMessage *msg)
1253 {
1254 SoupMessageIOData *io = soup_message_get_io_data (msg);
1255
1256 g_return_if_fail (io != NULL);
1257
1258 if (io->item && io->item->new_api)
1259 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1260
1261 if (io->io_source) {
1262 g_source_destroy (io->io_source);
1263 g_source_unref (io->io_source);
1264 io->io_source = NULL;
1265 }
1266
1267 if (io->unpause_source) {
1268 g_source_destroy (io->unpause_source);
1269 g_source_unref (io->unpause_source);
1270 io->unpause_source = NULL;
1271 }
1272
1273 io->paused = TRUE;
1274 }
1275
1276 static gboolean
io_unpause_internal(gpointer msg)1277 io_unpause_internal (gpointer msg)
1278 {
1279 SoupMessageIOData *io = soup_message_get_io_data (msg);
1280
1281 g_return_val_if_fail (io != NULL, FALSE);
1282
1283 g_clear_pointer (&io->unpause_source, g_source_unref);
1284 io->paused = FALSE;
1285
1286 if (io->io_source)
1287 return FALSE;
1288
1289 io_run (msg, FALSE);
1290 return FALSE;
1291 }
1292
1293 void
soup_message_io_unpause(SoupMessage * msg)1294 soup_message_io_unpause (SoupMessage *msg)
1295 {
1296 SoupMessageIOData *io = soup_message_get_io_data (msg);
1297
1298 g_return_if_fail (io != NULL);
1299
1300 if (io->item && io->item->new_api) {
1301 g_return_if_fail (io->read_state < SOUP_MESSAGE_IO_STATE_BODY);
1302 io->paused = FALSE;
1303 return;
1304 }
1305
1306 if (!io->unpause_source) {
1307 io->unpause_source = soup_add_completion_reffed (io->async_context,
1308 io_unpause_internal, msg, NULL);
1309 }
1310 }
1311
1312 /**
1313 * soup_message_io_in_progress:
1314 * @msg: a #SoupMessage
1315 *
1316 * Tests whether or not I/O is currently in progress on @msg.
1317 *
1318 * Return value: whether or not I/O is currently in progress.
1319 **/
1320 gboolean
soup_message_io_in_progress(SoupMessage * msg)1321 soup_message_io_in_progress (SoupMessage *msg)
1322 {
1323 return soup_message_get_io_data (msg) != NULL;
1324 }
1325