1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3 * soup-client-input-stream.c
4 *
5 * Copyright 2010-2012 Red Hat, Inc.
6 */
7
8 #ifdef HAVE_CONFIG_H
9 #include <config.h>
10 #endif
11
12 #include "soup-client-input-stream.h"
13 #include "soup.h"
14 #include "soup-message-private.h"
15
16 struct _SoupClientInputStreamPrivate {
17 SoupMessage *msg;
18 };
19
20 enum {
21 SIGNAL_EOF,
22 LAST_SIGNAL
23 };
24
25 static guint signals[LAST_SIGNAL] = { 0 };
26
27 enum {
28 PROP_0,
29
30 PROP_MESSAGE
31 };
32
33 static GPollableInputStreamInterface *soup_client_input_stream_parent_pollable_interface;
34 static void soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
35
G_DEFINE_TYPE_WITH_CODE(SoupClientInputStream,soup_client_input_stream,SOUP_TYPE_FILTER_INPUT_STREAM,G_ADD_PRIVATE (SoupClientInputStream)G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,soup_client_input_stream_pollable_init))36 G_DEFINE_TYPE_WITH_CODE (SoupClientInputStream, soup_client_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
37 G_ADD_PRIVATE (SoupClientInputStream)
38 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
39 soup_client_input_stream_pollable_init))
40
41 static void
42 soup_client_input_stream_init (SoupClientInputStream *stream)
43 {
44 stream->priv = soup_client_input_stream_get_instance_private (stream);
45 }
46
47 static void
soup_client_input_stream_finalize(GObject * object)48 soup_client_input_stream_finalize (GObject *object)
49 {
50 SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
51
52 g_clear_object (&cistream->priv->msg);
53
54 G_OBJECT_CLASS (soup_client_input_stream_parent_class)->finalize (object);
55 }
56
57 static void
soup_client_input_stream_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)58 soup_client_input_stream_set_property (GObject *object, guint prop_id,
59 const GValue *value, GParamSpec *pspec)
60 {
61 SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
62
63 switch (prop_id) {
64 case PROP_MESSAGE:
65 cistream->priv->msg = g_value_dup_object (value);
66 break;
67 default:
68 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
69 break;
70 }
71 }
72
73 static void
soup_client_input_stream_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)74 soup_client_input_stream_get_property (GObject *object, guint prop_id,
75 GValue *value, GParamSpec *pspec)
76 {
77 SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
78
79 switch (prop_id) {
80 case PROP_MESSAGE:
81 g_value_set_object (value, cistream->priv->msg);
82 break;
83 default:
84 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
85 break;
86 }
87 }
88
89 static gssize
soup_client_input_stream_read_fn(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)90 soup_client_input_stream_read_fn (GInputStream *stream,
91 void *buffer,
92 gsize count,
93 GCancellable *cancellable,
94 GError **error)
95 {
96 gssize nread;
97
98 nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
99 read_fn (stream, buffer, count, cancellable, error);
100
101 if (nread == 0)
102 g_signal_emit (stream, signals[SIGNAL_EOF], 0);
103
104 return nread;
105 }
106
107 static gssize
soup_client_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)108 soup_client_input_stream_read_nonblocking (GPollableInputStream *stream,
109 void *buffer,
110 gsize count,
111 GError **error)
112 {
113 gssize nread;
114
115 nread = soup_client_input_stream_parent_pollable_interface->
116 read_nonblocking (stream, buffer, count, error);
117
118 if (nread == 0)
119 g_signal_emit (stream, signals[SIGNAL_EOF], 0);
120
121 return nread;
122 }
123
124 static gboolean
soup_client_input_stream_close_fn(GInputStream * stream,GCancellable * cancellable,GError ** error)125 soup_client_input_stream_close_fn (GInputStream *stream,
126 GCancellable *cancellable,
127 GError **error)
128 {
129 SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
130 gboolean success;
131
132 success = soup_message_io_run_until_finish (cistream->priv->msg, TRUE,
133 NULL, error);
134 soup_message_io_finished (cistream->priv->msg);
135 return success;
136 }
137
138 static gboolean
idle_finish_close(gpointer user_data)139 idle_finish_close (gpointer user_data)
140 {
141 GTask *task = user_data;
142
143 g_task_return_boolean (task, TRUE);
144 g_object_unref (task);
145 return FALSE;
146 }
147
148 static gboolean
close_async_ready(SoupMessage * msg,gpointer user_data)149 close_async_ready (SoupMessage *msg, gpointer user_data)
150 {
151 GTask *task = user_data;
152 SoupClientInputStream *cistream = g_task_get_source_object (task);
153 GError *error = NULL;
154
155 if (!soup_message_io_run_until_finish (cistream->priv->msg, FALSE,
156 g_task_get_cancellable (task),
157 &error) &&
158 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
159 g_error_free (error);
160 return TRUE;
161 }
162
163 soup_message_io_finished (cistream->priv->msg);
164
165 if (error) {
166 g_task_return_error (task, error);
167 g_object_unref (task);
168 return FALSE;
169 }
170
171 /* Due to a historical accident, SoupSessionAsync relies on us
172 * waiting one extra cycle after run_until_finish() returns.
173 * Ugh. FIXME later when it's easier to do.
174 */
175 soup_add_idle (g_main_context_get_thread_default (),
176 idle_finish_close, task);
177 return FALSE;
178 }
179
180 static void
soup_client_input_stream_close_async(GInputStream * stream,gint priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)181 soup_client_input_stream_close_async (GInputStream *stream,
182 gint priority,
183 GCancellable *cancellable,
184 GAsyncReadyCallback callback,
185 gpointer user_data)
186 {
187 SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
188 GTask *task;
189 GSource *source;
190
191 task = g_task_new (stream, cancellable, callback, user_data);
192 g_task_set_priority (task, priority);
193
194 if (close_async_ready (cistream->priv->msg, task) == G_SOURCE_CONTINUE) {
195 source = soup_message_io_get_source (cistream->priv->msg,
196 cancellable, NULL, NULL);
197
198 g_task_attach_source (task, source, (GSourceFunc) close_async_ready);
199 g_source_unref (source);
200 }
201 }
202
203 static gboolean
soup_client_input_stream_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)204 soup_client_input_stream_close_finish (GInputStream *stream,
205 GAsyncResult *result,
206 GError **error)
207 {
208 return g_task_propagate_boolean (G_TASK (result), error);
209 }
210
211 static void
soup_client_input_stream_class_init(SoupClientInputStreamClass * stream_class)212 soup_client_input_stream_class_init (SoupClientInputStreamClass *stream_class)
213 {
214 GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
215 GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
216
217 object_class->finalize = soup_client_input_stream_finalize;
218 object_class->set_property = soup_client_input_stream_set_property;
219 object_class->get_property = soup_client_input_stream_get_property;
220
221 input_stream_class->read_fn = soup_client_input_stream_read_fn;
222 input_stream_class->close_fn = soup_client_input_stream_close_fn;
223 input_stream_class->close_async = soup_client_input_stream_close_async;
224 input_stream_class->close_finish = soup_client_input_stream_close_finish;
225
226 signals[SIGNAL_EOF] =
227 g_signal_new ("eof",
228 G_OBJECT_CLASS_TYPE (object_class),
229 G_SIGNAL_RUN_LAST,
230 0,
231 NULL, NULL,
232 NULL,
233 G_TYPE_NONE, 0);
234
235 g_object_class_install_property (
236 object_class, PROP_MESSAGE,
237 g_param_spec_object ("message",
238 "Message",
239 "Message",
240 SOUP_TYPE_MESSAGE,
241 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
242 G_PARAM_STATIC_STRINGS));
243 }
244
245 static void
soup_client_input_stream_pollable_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)246 soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
247 gpointer interface_data)
248 {
249 soup_client_input_stream_parent_pollable_interface =
250 g_type_interface_peek_parent (pollable_interface);
251
252 pollable_interface->read_nonblocking = soup_client_input_stream_read_nonblocking;
253 }
254
255 GInputStream *
soup_client_input_stream_new(GInputStream * base_stream,SoupMessage * msg)256 soup_client_input_stream_new (GInputStream *base_stream,
257 SoupMessage *msg)
258 {
259 return g_object_new (SOUP_TYPE_CLIENT_INPUT_STREAM,
260 "base-stream", base_stream,
261 "message", msg,
262 NULL);
263 }
264