1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3 * Copyright (C) 2012 Igalia, S.L.
4 */
5
6 #ifdef HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9
10 #include <glib/gi18n-lib.h>
11 #include "soup-cache-input-stream.h"
12 #include "soup-message-body.h"
13
14 /* properties */
15 enum {
16 PROP_0,
17
18 PROP_OUTPUT_STREAM,
19
20 LAST_PROP
21 };
22
23 enum {
24 CACHING_FINISHED,
25
26 LAST_SIGNAL
27 };
28
29 static guint signals[LAST_SIGNAL] = { 0 };
30
31 struct _SoupCacheInputStreamPrivate
32 {
33 GOutputStream *output_stream;
34 GCancellable *cancellable;
35 gsize bytes_written;
36
37 gboolean read_finished;
38 SoupBuffer *current_writing_buffer;
39 GQueue *buffer_queue;
40 };
41
42 static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
43
44 G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
45 G_ADD_PRIVATE (SoupCacheInputStream)
46 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
47 soup_cache_input_stream_pollable_init))
48
49
50 static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
51
52 static inline void
notify_and_clear(SoupCacheInputStream * istream,GError * error)53 notify_and_clear (SoupCacheInputStream *istream, GError *error)
54 {
55 SoupCacheInputStreamPrivate *priv = istream->priv;
56
57 g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error);
58
59 g_clear_object (&priv->cancellable);
60 g_clear_object (&priv->output_stream);
61 g_clear_error (&error);
62 }
63
64 static inline void
try_write_next_buffer(SoupCacheInputStream * istream)65 try_write_next_buffer (SoupCacheInputStream *istream)
66 {
67 SoupCacheInputStreamPrivate *priv = istream->priv;
68
69 if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
70 soup_cache_input_stream_write_next_buffer (istream);
71 else if (priv->read_finished)
72 notify_and_clear (istream, NULL);
73 else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
74 GError *error = NULL;
75 g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
76 _("Network stream unexpectedly closed"));
77 notify_and_clear (istream, error);
78 }
79 }
80
81 static void
file_replaced_cb(GObject * source,GAsyncResult * res,gpointer user_data)82 file_replaced_cb (GObject *source,
83 GAsyncResult *res,
84 gpointer user_data)
85 {
86 SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
87 SoupCacheInputStreamPrivate *priv = istream->priv;
88 GError *error = NULL;
89
90 priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
91
92 if (error)
93 notify_and_clear (istream, error);
94 else
95 try_write_next_buffer (istream);
96
97 g_object_unref (istream);
98 }
99
100 static void
soup_cache_input_stream_init(SoupCacheInputStream * self)101 soup_cache_input_stream_init (SoupCacheInputStream *self)
102 {
103 SoupCacheInputStreamPrivate *priv = soup_cache_input_stream_get_instance_private (self);
104
105 priv->buffer_queue = g_queue_new ();
106 self->priv = priv;
107 }
108
109 static void
soup_cache_input_stream_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)110 soup_cache_input_stream_get_property (GObject *object,
111 guint property_id, GValue *value, GParamSpec *pspec)
112 {
113 SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
114 SoupCacheInputStreamPrivate *priv = self->priv;
115
116 switch (property_id) {
117 case PROP_OUTPUT_STREAM:
118 g_value_set_object (value, priv->output_stream);
119 break;
120 default:
121 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
122 break;
123 }
124 }
125
126 static void
soup_cache_input_stream_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)127 soup_cache_input_stream_set_property (GObject *object,
128 guint property_id, const GValue *value, GParamSpec *pspec)
129 {
130 SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
131 SoupCacheInputStreamPrivate *priv = self->priv;
132
133 switch (property_id) {
134 case PROP_OUTPUT_STREAM:
135 priv->output_stream = g_value_dup_object (value);
136 break;
137 default:
138 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
139 break;
140 }
141 }
142
143 static void
soup_cache_input_stream_finalize(GObject * object)144 soup_cache_input_stream_finalize (GObject *object)
145 {
146 SoupCacheInputStream *self = (SoupCacheInputStream *)object;
147 SoupCacheInputStreamPrivate *priv = self->priv;
148
149 g_clear_object (&priv->cancellable);
150 g_clear_object (&priv->output_stream);
151 g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
152 g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
153
154 G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
155 }
156
157 static void
write_ready_cb(GObject * source,GAsyncResult * result,SoupCacheInputStream * istream)158 write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
159 {
160 GOutputStream *ostream = G_OUTPUT_STREAM (source);
161 SoupCacheInputStreamPrivate *priv = istream->priv;
162 gssize write_size;
163 gsize pending;
164 GError *error = NULL;
165
166 write_size = g_output_stream_write_finish (ostream, result, &error);
167 if (error) {
168 notify_and_clear (istream, error);
169 g_object_unref (istream);
170 return;
171 }
172
173 /* Check that we have written everything */
174 pending = priv->current_writing_buffer->length - write_size;
175 if (pending) {
176 SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
177 write_size, pending);
178 g_queue_push_head (priv->buffer_queue, subbuffer);
179 }
180
181 priv->bytes_written += write_size;
182 g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
183
184 try_write_next_buffer (istream);
185 g_object_unref (istream);
186 }
187
188 static void
soup_cache_input_stream_write_next_buffer(SoupCacheInputStream * istream)189 soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
190 {
191 SoupCacheInputStreamPrivate *priv = istream->priv;
192 SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
193 int priority;
194
195 g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
196
197 g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
198 priv->current_writing_buffer = buffer;
199
200 if (priv->buffer_queue->length > 10)
201 priority = G_PRIORITY_DEFAULT;
202 else
203 priority = G_PRIORITY_LOW;
204
205 g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
206 priority, priv->cancellable,
207 (GAsyncReadyCallback) write_ready_cb,
208 g_object_ref (istream));
209 }
210
211 static gssize
read_internal(GInputStream * stream,void * buffer,gsize count,gboolean blocking,GCancellable * cancellable,GError ** error)212 read_internal (GInputStream *stream,
213 void *buffer,
214 gsize count,
215 gboolean blocking,
216 GCancellable *cancellable,
217 GError **error)
218 {
219 SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
220 SoupCacheInputStreamPrivate *priv = istream->priv;
221 GInputStream *base_stream;
222 gssize nread;
223
224 base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
225 nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
226 cancellable, error);
227
228 if (G_UNLIKELY (nread == -1 || priv->read_finished))
229 return nread;
230
231 if (nread == 0) {
232 priv->read_finished = TRUE;
233
234 if (priv->current_writing_buffer == NULL && priv->output_stream)
235 notify_and_clear (istream, NULL);
236 } else {
237 SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
238 g_queue_push_tail (priv->buffer_queue, soup_buffer);
239
240 if (priv->current_writing_buffer == NULL && priv->output_stream)
241 soup_cache_input_stream_write_next_buffer (istream);
242 }
243
244 return nread;
245 }
246
247 static gssize
soup_cache_input_stream_read_fn(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)248 soup_cache_input_stream_read_fn (GInputStream *stream,
249 void *buffer,
250 gsize count,
251 GCancellable *cancellable,
252 GError **error)
253 {
254 return read_internal (stream, buffer, count, TRUE,
255 cancellable, error);
256 }
257
258 static gssize
soup_cache_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)259 soup_cache_input_stream_read_nonblocking (GPollableInputStream *stream,
260 void *buffer,
261 gsize count,
262 GError **error)
263 {
264 return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
265 NULL, error);
266 }
267
268 static void
soup_cache_input_stream_pollable_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)269 soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
270 gpointer interface_data)
271 {
272 pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
273 }
274
275 static gboolean
soup_cache_input_stream_close_fn(GInputStream * stream,GCancellable * cancellable,GError ** error)276 soup_cache_input_stream_close_fn (GInputStream *stream,
277 GCancellable *cancellable,
278 GError **error)
279 {
280 SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
281 SoupCacheInputStreamPrivate *priv = istream->priv;
282
283 if (!priv->read_finished) {
284 if (priv->output_stream) {
285 /* Cancel any pending write operation or return an error if none. */
286 if (g_output_stream_has_pending (priv->output_stream))
287 g_cancellable_cancel (priv->cancellable);
288 else {
289 GError *notify_error = NULL;
290 g_set_error_literal (¬ify_error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
291 _("Failed to completely cache the resource"));
292 notify_and_clear (istream, notify_error);
293 }
294 } else if (priv->cancellable)
295 /* The file_replace_async() hasn't finished yet */
296 g_cancellable_cancel (priv->cancellable);
297 }
298
299 return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error);
300 }
301
302 static void
soup_cache_input_stream_class_init(SoupCacheInputStreamClass * klass)303 soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
304 {
305 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
306 GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
307
308 gobject_class->get_property = soup_cache_input_stream_get_property;
309 gobject_class->set_property = soup_cache_input_stream_set_property;
310 gobject_class->finalize = soup_cache_input_stream_finalize;
311
312 istream_class->read_fn = soup_cache_input_stream_read_fn;
313 istream_class->close_fn = soup_cache_input_stream_close_fn;
314
315 g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
316 g_param_spec_object ("output-stream", "Output stream",
317 "the output stream where to write.",
318 G_TYPE_OUTPUT_STREAM,
319 G_PARAM_READWRITE |
320 G_PARAM_CONSTRUCT_ONLY |
321 G_PARAM_STATIC_STRINGS));
322
323 signals[CACHING_FINISHED] =
324 g_signal_new ("caching-finished",
325 G_OBJECT_CLASS_TYPE (gobject_class),
326 G_SIGNAL_RUN_FIRST,
327 G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished),
328 NULL, NULL,
329 NULL,
330 G_TYPE_NONE, 2,
331 G_TYPE_INT, G_TYPE_ERROR);
332 }
333
334 GInputStream *
soup_cache_input_stream_new(GInputStream * base_stream,GFile * file)335 soup_cache_input_stream_new (GInputStream *base_stream,
336 GFile *file)
337 {
338 SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
339 "base-stream", base_stream,
340 "close-base-stream", FALSE,
341 NULL);
342
343 istream->priv->cancellable = g_cancellable_new ();
344 g_file_replace_async (file, NULL, FALSE,
345 G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
346 G_PRIORITY_DEFAULT, istream->priv->cancellable,
347 file_replaced_cb, g_object_ref (istream));
348
349 return (GInputStream *) istream;
350 }
351