• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-filter-input-stream.c
4  *
5  * Copyright 2012 Red Hat, Inc.
6  */
7 
8 #ifdef HAVE_CONFIG_H
9 #include <config.h>
10 #endif
11 
12 #include <string.h>
13 
14 #include "soup-filter-input-stream.h"
15 #include "soup.h"
16 
17 /* This is essentially a subset of GDataInputStream, except that we
18  * can do the equivalent of "fill_nonblocking()" on it. (We could use
19  * an actual GDataInputStream, and implement the nonblocking semantics
20  * via fill_async(), but that would be more work...)
21  */
22 
23 struct _SoupFilterInputStreamPrivate {
24 	GByteArray *buf;
25 	gboolean need_more;
26 	gboolean in_read_until;
27 };
28 
29 static void soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
30 
G_DEFINE_TYPE_WITH_CODE(SoupFilterInputStream,soup_filter_input_stream,G_TYPE_FILTER_INPUT_STREAM,G_ADD_PRIVATE (SoupFilterInputStream)G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,soup_filter_input_stream_pollable_init))31 G_DEFINE_TYPE_WITH_CODE (SoupFilterInputStream, soup_filter_input_stream, G_TYPE_FILTER_INPUT_STREAM,
32                          G_ADD_PRIVATE (SoupFilterInputStream)
33 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
34 						soup_filter_input_stream_pollable_init))
35 
36 static void
37 soup_filter_input_stream_init (SoupFilterInputStream *stream)
38 {
39 	stream->priv = soup_filter_input_stream_get_instance_private (stream);
40 }
41 
42 static void
soup_filter_input_stream_finalize(GObject * object)43 soup_filter_input_stream_finalize (GObject *object)
44 {
45 	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (object);
46 
47 	g_clear_pointer (&fstream->priv->buf, g_byte_array_unref);
48 
49 	G_OBJECT_CLASS (soup_filter_input_stream_parent_class)->finalize (object);
50 }
51 
52 static gssize
read_from_buf(SoupFilterInputStream * fstream,gpointer buffer,gsize count)53 read_from_buf (SoupFilterInputStream *fstream, gpointer buffer, gsize count)
54 {
55 	GByteArray *buf = fstream->priv->buf;
56 
57 	if (buf->len < count)
58 		count = buf->len;
59 	memcpy (buffer, buf->data, count);
60 
61 	if (count == buf->len) {
62 		g_byte_array_free (buf, TRUE);
63 		fstream->priv->buf = NULL;
64 	} else {
65 		memmove (buf->data, buf->data + count,
66 			 buf->len - count);
67 		g_byte_array_set_size (buf, buf->len - count);
68 	}
69 
70 	return count;
71 }
72 
73 static gssize
soup_filter_input_stream_read_fn(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)74 soup_filter_input_stream_read_fn (GInputStream  *stream,
75 				  void          *buffer,
76 				  gsize          count,
77 				  GCancellable  *cancellable,
78 				  GError       **error)
79 {
80 	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
81 
82 	if (!fstream->priv->in_read_until)
83 		fstream->priv->need_more = FALSE;
84 
85 	if (fstream->priv->buf && !fstream->priv->in_read_until) {
86 		return read_from_buf (fstream, buffer, count);
87 	} else {
88 		return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream,
89 					       buffer, count,
90 					       TRUE, cancellable, error);
91 	}
92 }
93 
94 static gboolean
soup_filter_input_stream_is_readable(GPollableInputStream * stream)95 soup_filter_input_stream_is_readable (GPollableInputStream *stream)
96 {
97 	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
98 
99 	if (fstream->priv->buf && !fstream->priv->need_more)
100 		return TRUE;
101 	else
102 		return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream));
103 }
104 
105 static gssize
soup_filter_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)106 soup_filter_input_stream_read_nonblocking (GPollableInputStream  *stream,
107 					   void                  *buffer,
108 					   gsize                  count,
109 					   GError               **error)
110 {
111 	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
112 
113 	if (!fstream->priv->in_read_until)
114 		fstream->priv->need_more = FALSE;
115 
116 	if (fstream->priv->buf && !fstream->priv->in_read_until) {
117 		return read_from_buf (fstream, buffer, count);
118 	} else {
119 		return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream,
120 					       buffer, count,
121 					       FALSE, NULL, error);
122 	}
123 }
124 
125 static GSource *
soup_filter_input_stream_create_source(GPollableInputStream * stream,GCancellable * cancellable)126 soup_filter_input_stream_create_source (GPollableInputStream *stream,
127 					GCancellable         *cancellable)
128 {
129 	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
130 	GSource *base_source, *pollable_source;
131 
132 	if (fstream->priv->buf && !fstream->priv->need_more)
133 		base_source = g_timeout_source_new (0);
134 	else
135 		base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream), cancellable);
136 
137 	g_source_set_dummy_callback (base_source);
138 	pollable_source = g_pollable_source_new (G_OBJECT (stream));
139 	g_source_add_child_source (pollable_source, base_source);
140 	g_source_unref (base_source);
141 
142 	return pollable_source;
143 }
144 
145 static void
soup_filter_input_stream_class_init(SoupFilterInputStreamClass * stream_class)146 soup_filter_input_stream_class_init (SoupFilterInputStreamClass *stream_class)
147 {
148 	GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
149 	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
150 
151 	object_class->finalize = soup_filter_input_stream_finalize;
152 
153 	input_stream_class->read_fn = soup_filter_input_stream_read_fn;
154 }
155 
156 static void
soup_filter_input_stream_pollable_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)157 soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
158 					gpointer                       interface_data)
159 {
160 	pollable_interface->is_readable = soup_filter_input_stream_is_readable;
161 	pollable_interface->read_nonblocking = soup_filter_input_stream_read_nonblocking;
162 	pollable_interface->create_source = soup_filter_input_stream_create_source;
163 }
164 
165 GInputStream *
soup_filter_input_stream_new(GInputStream * base_stream)166 soup_filter_input_stream_new (GInputStream *base_stream)
167 {
168 	return g_object_new (SOUP_TYPE_FILTER_INPUT_STREAM,
169 			     "base-stream", base_stream,
170 			     "close-base-stream", FALSE,
171 			     NULL);
172 }
173 
174 gssize
soup_filter_input_stream_read_line(SoupFilterInputStream * fstream,void * buffer,gsize length,gboolean blocking,gboolean * got_line,GCancellable * cancellable,GError ** error)175 soup_filter_input_stream_read_line (SoupFilterInputStream  *fstream,
176 				    void                   *buffer,
177 				    gsize                   length,
178 				    gboolean                blocking,
179 				    gboolean               *got_line,
180 				    GCancellable           *cancellable,
181 				    GError                **error)
182 {
183 	return soup_filter_input_stream_read_until (fstream, buffer, length,
184 						    "\n", 1, blocking,
185 						    TRUE, got_line,
186 						    cancellable, error);
187 }
188 
189 gssize
soup_filter_input_stream_read_until(SoupFilterInputStream * fstream,void * buffer,gsize length,const void * boundary,gsize boundary_length,gboolean blocking,gboolean include_boundary,gboolean * got_boundary,GCancellable * cancellable,GError ** error)190 soup_filter_input_stream_read_until (SoupFilterInputStream  *fstream,
191 				     void                   *buffer,
192 				     gsize                   length,
193 				     const void             *boundary,
194 				     gsize                   boundary_length,
195 				     gboolean                blocking,
196 				     gboolean                include_boundary,
197 				     gboolean               *got_boundary,
198 				     GCancellable           *cancellable,
199 				     GError                **error)
200 {
201 	gssize nread, read_length;
202 	guint8 *p, *buf, *end;
203 	gboolean eof = FALSE;
204 	GError *my_error = NULL;
205 
206 	g_return_val_if_fail (SOUP_IS_FILTER_INPUT_STREAM (fstream), -1);
207 	g_return_val_if_fail (!include_boundary || (boundary_length < length), -1);
208 
209 	*got_boundary = FALSE;
210 	fstream->priv->need_more = FALSE;
211 
212 	if (!fstream->priv->buf || fstream->priv->buf->len < boundary_length) {
213 		guint prev_len;
214 
215 	fill_buffer:
216 		if (!fstream->priv->buf)
217 			fstream->priv->buf = g_byte_array_new ();
218 		prev_len = fstream->priv->buf->len;
219 		g_byte_array_set_size (fstream->priv->buf, length);
220 		buf = fstream->priv->buf->data;
221 
222 		fstream->priv->in_read_until = TRUE;
223 		nread = g_pollable_stream_read (G_INPUT_STREAM (fstream),
224 						buf + prev_len, length - prev_len,
225 						blocking,
226 						cancellable, &my_error);
227 		fstream->priv->in_read_until = FALSE;
228 		if (nread <= 0) {
229 			if (prev_len)
230 				fstream->priv->buf->len = prev_len;
231 			else {
232 				g_byte_array_free (fstream->priv->buf, TRUE);
233 				fstream->priv->buf = NULL;
234 			}
235 
236 			if (nread == 0 && prev_len)
237 				eof = TRUE;
238 			else {
239 				if (g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
240 					fstream->priv->need_more = TRUE;
241 				if (my_error)
242 					g_propagate_error (error, my_error);
243 
244 				return nread;
245 			}
246 
247 			if (my_error)
248 				g_propagate_error (error, my_error);
249 		} else
250 			fstream->priv->buf->len = prev_len + nread;
251 	} else
252 		buf = fstream->priv->buf->data;
253 
254 	/* Scan for the boundary within the range we can possibly return. */
255 	if (include_boundary)
256 		end = buf + MIN (fstream->priv->buf->len, length) - boundary_length;
257 	else
258 		end = buf + MIN (fstream->priv->buf->len - boundary_length, length);
259 	for (p = buf; p <= end; p++) {
260 		if (*p == *(guint8*)boundary &&
261 		    !memcmp (p, boundary, boundary_length)) {
262 			if (include_boundary)
263 				p += boundary_length;
264 			*got_boundary = TRUE;
265 			break;
266 		}
267 	}
268 
269 	if (!*got_boundary && fstream->priv->buf->len < length && !eof)
270 		goto fill_buffer;
271 
272 	if (eof && !*got_boundary)
273 		read_length = MIN (fstream->priv->buf->len, length);
274 	else
275 		read_length = p - buf;
276 	return read_from_buf (fstream, buffer, read_length);
277 }
278