• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2012 Igalia, S.L.
4  */
5 
6 #ifdef HAVE_CONFIG_H
7 #include <config.h>
8 #endif
9 
10 #include <glib/gi18n-lib.h>
11 #include "soup-cache-input-stream.h"
12 #include "soup-message-body.h"
13 
14 /* properties */
15 enum {
16 	PROP_0,
17 
18 	PROP_OUTPUT_STREAM,
19 
20 	LAST_PROP
21 };
22 
23 enum {
24 	CACHING_FINISHED,
25 
26 	LAST_SIGNAL
27 };
28 
29 static guint signals[LAST_SIGNAL] = { 0 };
30 
31 struct _SoupCacheInputStreamPrivate
32 {
33 	GOutputStream *output_stream;
34 	GCancellable *cancellable;
35 	gsize bytes_written;
36 
37 	gboolean read_finished;
38 	SoupBuffer *current_writing_buffer;
39 	GQueue *buffer_queue;
40 };
41 
42 static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
43 
44 G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
45                          G_ADD_PRIVATE (SoupCacheInputStream)
46 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
47 						soup_cache_input_stream_pollable_init))
48 
49 
50 static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
51 
52 static inline void
notify_and_clear(SoupCacheInputStream * istream,GError * error)53 notify_and_clear (SoupCacheInputStream *istream, GError *error)
54 {
55 	SoupCacheInputStreamPrivate *priv = istream->priv;
56 
57 	g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error);
58 
59 	g_clear_object (&priv->cancellable);
60 	g_clear_object (&priv->output_stream);
61 	g_clear_error (&error);
62 }
63 
64 static inline void
try_write_next_buffer(SoupCacheInputStream * istream)65 try_write_next_buffer (SoupCacheInputStream *istream)
66 {
67 	SoupCacheInputStreamPrivate *priv = istream->priv;
68 
69 	if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
70 		soup_cache_input_stream_write_next_buffer (istream);
71 	else if (priv->read_finished)
72 		notify_and_clear (istream, NULL);
73 	else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
74 		GError *error = NULL;
75 		g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
76 				     _("Network stream unexpectedly closed"));
77 		notify_and_clear (istream, error);
78 	}
79 }
80 
81 static void
file_replaced_cb(GObject * source,GAsyncResult * res,gpointer user_data)82 file_replaced_cb (GObject      *source,
83 		  GAsyncResult *res,
84 		  gpointer      user_data)
85 {
86 	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
87 	SoupCacheInputStreamPrivate *priv = istream->priv;
88 	GError *error = NULL;
89 
90 	priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
91 
92 	if (error)
93 		notify_and_clear (istream, error);
94 	else
95 		try_write_next_buffer (istream);
96 
97 	g_object_unref (istream);
98 }
99 
100 static void
soup_cache_input_stream_init(SoupCacheInputStream * self)101 soup_cache_input_stream_init (SoupCacheInputStream *self)
102 {
103 	SoupCacheInputStreamPrivate *priv = soup_cache_input_stream_get_instance_private (self);
104 
105 	priv->buffer_queue = g_queue_new ();
106 	self->priv = priv;
107 }
108 
109 static void
soup_cache_input_stream_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)110 soup_cache_input_stream_get_property (GObject *object,
111 				      guint property_id, GValue *value, GParamSpec *pspec)
112 {
113 	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
114 	SoupCacheInputStreamPrivate *priv = self->priv;
115 
116 	switch (property_id) {
117 	case PROP_OUTPUT_STREAM:
118 		g_value_set_object (value, priv->output_stream);
119 		break;
120 	default:
121 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
122 		break;
123 	}
124 }
125 
126 static void
soup_cache_input_stream_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)127 soup_cache_input_stream_set_property (GObject *object,
128 				      guint property_id, const GValue *value, GParamSpec *pspec)
129 {
130 	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
131 	SoupCacheInputStreamPrivate *priv = self->priv;
132 
133 	switch (property_id) {
134 	case PROP_OUTPUT_STREAM:
135 		priv->output_stream = g_value_dup_object (value);
136 		break;
137 	default:
138 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
139 		break;
140 	}
141 }
142 
143 static void
soup_cache_input_stream_finalize(GObject * object)144 soup_cache_input_stream_finalize (GObject *object)
145 {
146 	SoupCacheInputStream *self = (SoupCacheInputStream *)object;
147 	SoupCacheInputStreamPrivate *priv = self->priv;
148 
149 	g_clear_object (&priv->cancellable);
150 	g_clear_object (&priv->output_stream);
151 	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
152 	g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
153 
154 	G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
155 }
156 
157 static void
write_ready_cb(GObject * source,GAsyncResult * result,SoupCacheInputStream * istream)158 write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
159 {
160 	GOutputStream *ostream = G_OUTPUT_STREAM (source);
161 	SoupCacheInputStreamPrivate *priv = istream->priv;
162 	gssize write_size;
163 	gsize pending;
164 	GError *error = NULL;
165 
166 	write_size = g_output_stream_write_finish (ostream, result, &error);
167 	if (error) {
168 		notify_and_clear (istream, error);
169 		g_object_unref (istream);
170 		return;
171 	}
172 
173 	/* Check that we have written everything */
174 	pending = priv->current_writing_buffer->length - write_size;
175 	if (pending) {
176 		SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
177 								   write_size, pending);
178 		g_queue_push_head (priv->buffer_queue, subbuffer);
179 	}
180 
181 	priv->bytes_written += write_size;
182 	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
183 
184 	try_write_next_buffer (istream);
185 	g_object_unref (istream);
186 }
187 
188 static void
soup_cache_input_stream_write_next_buffer(SoupCacheInputStream * istream)189 soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
190 {
191 	SoupCacheInputStreamPrivate *priv = istream->priv;
192 	SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
193 	int priority;
194 
195 	g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
196 
197 	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
198 	priv->current_writing_buffer = buffer;
199 
200 	if (priv->buffer_queue->length > 10)
201 		priority = G_PRIORITY_DEFAULT;
202 	else
203 		priority = G_PRIORITY_LOW;
204 
205 	g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
206 				     priority, priv->cancellable,
207 				     (GAsyncReadyCallback) write_ready_cb,
208 				     g_object_ref (istream));
209 }
210 
211 static gssize
read_internal(GInputStream * stream,void * buffer,gsize count,gboolean blocking,GCancellable * cancellable,GError ** error)212 read_internal (GInputStream  *stream,
213 	       void          *buffer,
214 	       gsize          count,
215 	       gboolean       blocking,
216 	       GCancellable  *cancellable,
217 	       GError       **error)
218 {
219 	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
220 	SoupCacheInputStreamPrivate *priv = istream->priv;
221 	GInputStream *base_stream;
222 	gssize nread;
223 
224 	base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
225 	nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
226 					cancellable, error);
227 
228 	if (G_UNLIKELY (nread == -1 || priv->read_finished))
229 		return nread;
230 
231 	if (nread == 0) {
232 		priv->read_finished = TRUE;
233 
234 		if (priv->current_writing_buffer == NULL && priv->output_stream)
235 			notify_and_clear (istream, NULL);
236 	} else {
237 		SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
238 		g_queue_push_tail (priv->buffer_queue, soup_buffer);
239 
240 		if (priv->current_writing_buffer == NULL && priv->output_stream)
241 			soup_cache_input_stream_write_next_buffer (istream);
242 	}
243 
244 	return nread;
245 }
246 
247 static gssize
soup_cache_input_stream_read_fn(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)248 soup_cache_input_stream_read_fn (GInputStream  *stream,
249 				 void          *buffer,
250 				 gsize          count,
251 				 GCancellable  *cancellable,
252 				 GError       **error)
253 {
254 	return read_internal (stream, buffer, count, TRUE,
255 			      cancellable, error);
256 }
257 
258 static gssize
soup_cache_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)259 soup_cache_input_stream_read_nonblocking (GPollableInputStream  *stream,
260 					  void                  *buffer,
261 					  gsize                  count,
262 					  GError               **error)
263 {
264 	return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
265 			      NULL, error);
266 }
267 
268 static void
soup_cache_input_stream_pollable_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)269 soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
270 				       gpointer interface_data)
271 {
272 	pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
273 }
274 
275 static gboolean
soup_cache_input_stream_close_fn(GInputStream * stream,GCancellable * cancellable,GError ** error)276 soup_cache_input_stream_close_fn (GInputStream  *stream,
277 				  GCancellable  *cancellable,
278 				  GError       **error)
279 {
280 	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
281 	SoupCacheInputStreamPrivate *priv = istream->priv;
282 
283 	if (!priv->read_finished) {
284 		if (priv->output_stream) {
285 			/* Cancel any pending write operation or return an error if none. */
286 			if (g_output_stream_has_pending (priv->output_stream))
287 				g_cancellable_cancel (priv->cancellable);
288 			else {
289 				GError *notify_error = NULL;
290 				g_set_error_literal (&notify_error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
291 						     _("Failed to completely cache the resource"));
292 				notify_and_clear (istream, notify_error);
293 			}
294 		} else if (priv->cancellable)
295 			/* The file_replace_async() hasn't finished yet */
296 			g_cancellable_cancel (priv->cancellable);
297 	}
298 
299 	return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error);
300 }
301 
302 static void
soup_cache_input_stream_class_init(SoupCacheInputStreamClass * klass)303 soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
304 {
305 	GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
306 	GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
307 
308 	gobject_class->get_property = soup_cache_input_stream_get_property;
309 	gobject_class->set_property = soup_cache_input_stream_set_property;
310 	gobject_class->finalize = soup_cache_input_stream_finalize;
311 
312 	istream_class->read_fn = soup_cache_input_stream_read_fn;
313 	istream_class->close_fn = soup_cache_input_stream_close_fn;
314 
315 	g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
316 					 g_param_spec_object ("output-stream", "Output stream",
317 							      "the output stream where to write.",
318 							      G_TYPE_OUTPUT_STREAM,
319 							      G_PARAM_READWRITE |
320 							      G_PARAM_CONSTRUCT_ONLY |
321 							      G_PARAM_STATIC_STRINGS));
322 
323 	signals[CACHING_FINISHED] =
324 		g_signal_new ("caching-finished",
325 			      G_OBJECT_CLASS_TYPE (gobject_class),
326 			      G_SIGNAL_RUN_FIRST,
327 			      G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished),
328 			      NULL, NULL,
329 			      NULL,
330 			      G_TYPE_NONE, 2,
331 			      G_TYPE_INT, G_TYPE_ERROR);
332 }
333 
334 GInputStream *
soup_cache_input_stream_new(GInputStream * base_stream,GFile * file)335 soup_cache_input_stream_new (GInputStream *base_stream,
336 			     GFile        *file)
337 {
338 	SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
339 					      "base-stream", base_stream,
340 					      "close-base-stream", FALSE,
341 					      NULL);
342 
343 	istream->priv->cancellable = g_cancellable_new ();
344 	g_file_replace_async (file, NULL, FALSE,
345 			      G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
346 			      G_PRIORITY_DEFAULT, istream->priv->cancellable,
347 			      file_replaced_cb, g_object_ref (istream));
348 
349 	return (GInputStream *) istream;
350 }
351