• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-message-queue.c: Message queue
4  *
5  * Copyright (C) 2003 Novell, Inc.
6  * Copyright (C) 2008 Red Hat, Inc.
7  */
8 
9 #ifdef HAVE_CONFIG_H
10 #include <config.h>
11 #endif
12 
13 #include "soup-message-queue.h"
14 #include "soup.h"
15 
16 /* This is an internal structure used by #SoupSession and its
17  * subclasses to keep track of the status of messages currently being
18  * processed.
19  *
20  * The #SoupMessageQueue itself is mostly just a linked list of
21  * #SoupMessageQueueItem, with some added cleverness to allow the list
22  * to be walked safely while other threads / re-entrant loops are
23  * adding items to and removing items from it. In particular, this is
24  * handled by refcounting items and then keeping "removed" items in
25  * the list until their ref_count drops to 0, but skipping over the
26  * "removed" ones when walking the queue.
27  **/
28 
29 struct _SoupMessageQueue {
30 	SoupSession *session;
31 
32 	GMutex mutex;
33 	SoupMessageQueueItem *head, *tail;
34 };
35 
36 SoupMessageQueue *
soup_message_queue_new(SoupSession * session)37 soup_message_queue_new (SoupSession *session)
38 {
39 	SoupMessageQueue *queue;
40 
41 	queue = g_slice_new0 (SoupMessageQueue);
42 	queue->session = session;
43 	g_mutex_init (&queue->mutex);
44 	return queue;
45 }
46 
47 void
soup_message_queue_destroy(SoupMessageQueue * queue)48 soup_message_queue_destroy (SoupMessageQueue *queue)
49 {
50 	g_return_if_fail (queue->head == NULL);
51 
52 	g_mutex_clear (&queue->mutex);
53 	g_slice_free (SoupMessageQueue, queue);
54 }
55 
56 static void
queue_message_restarted(SoupMessage * msg,gpointer user_data)57 queue_message_restarted (SoupMessage *msg, gpointer user_data)
58 {
59 	SoupMessageQueueItem *item = user_data;
60 
61 	g_cancellable_reset (item->cancellable);
62 }
63 
64 /**
65  * soup_message_queue_append:
66  * @queue: a #SoupMessageQueue
67  * @msg: a #SoupMessage
68  * @callback: the callback for @msg
69  * @user_data: the data to pass to @callback
70  *
71  * Creates a new #SoupMessageQueueItem and appends it to @queue.
72  *
73  * Return value: the new item, which you must unref with
74  * soup_message_queue_unref_item() when you are done with.
75  **/
76 SoupMessageQueueItem *
soup_message_queue_append(SoupMessageQueue * queue,SoupMessage * msg,SoupSessionCallback callback,gpointer user_data)77 soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
78 			   SoupSessionCallback callback, gpointer user_data)
79 {
80 	SoupMessageQueueItem *item;
81 
82 	item = g_slice_new0 (SoupMessageQueueItem);
83 	item->session = g_object_ref (queue->session);
84 	item->async_context = soup_session_get_async_context (item->session);
85 	if (item->async_context)
86 		g_main_context_ref (item->async_context);
87 	item->queue = queue;
88 	item->msg = g_object_ref (msg);
89 	item->callback = callback;
90 	item->callback_data = user_data;
91 	item->cancellable = g_cancellable_new ();
92 	item->priority = soup_message_get_priority (msg);
93 
94 	g_signal_connect (msg, "restarted",
95 			  G_CALLBACK (queue_message_restarted), item);
96 
97 	/* Note: the initial ref_count of 1 represents the caller's
98 	 * ref; the queue's own ref is indicated by the absence of the
99 	 * "removed" flag.
100 	 */
101 	item->ref_count = 1;
102 
103 	g_mutex_lock (&queue->mutex);
104 	if (queue->head) {
105 		SoupMessageQueueItem *it = queue->head;
106 
107 		while (it && it->priority >= item->priority)
108 			it = it->next;
109 
110 		if (!it) {
111 			if (queue->tail) {
112 				queue->tail->next = item;
113 				item->prev = queue->tail;
114 			} else
115 				queue->head = item;
116 			queue->tail = item;
117 		} else {
118 			if (it != queue->head)
119 				it->prev->next = item;
120 			else
121 				queue->head = item;
122 			item->prev = it->prev;
123 			it->prev = item;
124 			item->next = it;
125 		}
126 	} else
127 		queue->head = queue->tail = item;
128 
129 	g_mutex_unlock (&queue->mutex);
130 	return item;
131 }
132 
133 /**
134  * soup_message_queue_item_ref:
135  * @item: a #SoupMessageQueueItem
136  *
137  * Refs @item.
138  **/
139 void
soup_message_queue_item_ref(SoupMessageQueueItem * item)140 soup_message_queue_item_ref (SoupMessageQueueItem *item)
141 {
142 	g_mutex_lock (&item->queue->mutex);
143 	item->ref_count++;
144 	g_mutex_unlock (&item->queue->mutex);
145 }
146 
147 /**
148  * soup_message_queue_item_unref:
149  * @item: a #SoupMessageQueueItem
150  *
151  * Unrefs @item; use this on a #SoupMessageQueueItem that you are done
152  * with (but that you aren't passing to
153  * soup_message_queue_item_next()).
154  **/
155 void
soup_message_queue_item_unref(SoupMessageQueueItem * item)156 soup_message_queue_item_unref (SoupMessageQueueItem *item)
157 {
158 	g_mutex_lock (&item->queue->mutex);
159 
160 	/* Decrement the ref_count; if it's still non-zero OR if the
161 	 * item is still in the queue, then return.
162 	 */
163 	if (--item->ref_count || !item->removed) {
164 		g_mutex_unlock (&item->queue->mutex);
165 		return;
166 	}
167 
168 	g_warn_if_fail (item->conn == NULL);
169 
170 	/* OK, @item is dead. Rewrite @queue around it */
171 	if (item->prev)
172 		item->prev->next = item->next;
173 	else
174 		item->queue->head = item->next;
175 	if (item->next)
176 		item->next->prev = item->prev;
177 	else
178 		item->queue->tail = item->prev;
179 
180 	g_mutex_unlock (&item->queue->mutex);
181 
182 	/* And free it */
183 	g_signal_handlers_disconnect_by_func (item->msg,
184 					      queue_message_restarted, item);
185 	g_object_unref (item->session);
186 	g_object_unref (item->msg);
187 	g_object_unref (item->cancellable);
188 	g_clear_error (&item->error);
189 	g_clear_object (&item->task);
190 	g_clear_pointer (&item->async_context, g_main_context_unref);
191 	if (item->io_source) {
192 		g_source_destroy (item->io_source);
193 		g_source_unref (item->io_source);
194 	}
195 	g_slice_free (SoupMessageQueueItem, item);
196 }
197 
198 /**
199  * soup_message_queue_lookup:
200  * @queue: a #SoupMessageQueue
201  * @msg: a #SoupMessage
202  *
203  * Finds the #SoupMessageQueueItem for @msg in @queue. You must unref
204  * the item with soup_message_queue_unref_item() when you are done
205  * with it.
206  *
207  * Return value: (nullable): the queue item for @msg, or %NULL
208  **/
209 SoupMessageQueueItem *
soup_message_queue_lookup(SoupMessageQueue * queue,SoupMessage * msg)210 soup_message_queue_lookup (SoupMessageQueue *queue, SoupMessage *msg)
211 {
212 	SoupMessageQueueItem *item;
213 
214 	g_mutex_lock (&queue->mutex);
215 
216 	item = queue->tail;
217 	while (item && (item->removed || item->msg != msg))
218 		item = item->prev;
219 
220 	if (item)
221 		item->ref_count++;
222 
223 	g_mutex_unlock (&queue->mutex);
224 	return item;
225 }
226 
227 /**
228  * soup_message_queue_first:
229  * @queue: a #SoupMessageQueue
230  *
231  * Gets the first item in @queue. You must unref the item by calling
232  * soup_message_queue_unref_item() on it when you are done.
233  * (soup_message_queue_next() does this for you automatically, so you
234  * only need to unref the item yourself if you are not going to
235  * finishing walking the queue.)
236  *
237  * Return value: the first item in @queue.
238  **/
239 SoupMessageQueueItem *
soup_message_queue_first(SoupMessageQueue * queue)240 soup_message_queue_first (SoupMessageQueue *queue)
241 {
242 	SoupMessageQueueItem *item;
243 
244 	g_mutex_lock (&queue->mutex);
245 
246 	item = queue->head;
247 	while (item && item->removed)
248 		item = item->next;
249 
250 	if (item)
251 		item->ref_count++;
252 
253 	g_mutex_unlock (&queue->mutex);
254 	return item;
255 }
256 
257 /**
258  * soup_message_queue_next:
259  * @queue: a #SoupMessageQueue
260  * @item: a #SoupMessageQueueItem
261  *
262  * Unrefs @item and gets the next item after it in @queue. As with
263  * soup_message_queue_first(), you must unref the returned item
264  * yourself with soup_message_queue_unref_item() if you do not finish
265  * walking the queue.
266  *
267  * Return value: the next item in @queue.
268  **/
269 SoupMessageQueueItem *
soup_message_queue_next(SoupMessageQueue * queue,SoupMessageQueueItem * item)270 soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueItem *item)
271 {
272 	SoupMessageQueueItem *next;
273 
274 	g_mutex_lock (&queue->mutex);
275 
276 	next = item->next;
277 	while (next && next->removed)
278 		next = next->next;
279 	if (next)
280 		next->ref_count++;
281 
282 	g_mutex_unlock (&queue->mutex);
283 	soup_message_queue_item_unref (item);
284 	return next;
285 }
286 
287 /**
288  * soup_message_queue_remove:
289  * @queue: a #SoupMessageQueue
290  * @item: a #SoupMessageQueueItem
291  *
292  * Removes @item from @queue. Note that you probably also need to call
293  * soup_message_queue_unref_item() after this.
294  **/
295 void
soup_message_queue_remove(SoupMessageQueue * queue,SoupMessageQueueItem * item)296 soup_message_queue_remove (SoupMessageQueue *queue, SoupMessageQueueItem *item)
297 {
298 	g_return_if_fail (!item->removed);
299 
300 	g_mutex_lock (&queue->mutex);
301 	item->removed = TRUE;
302 	g_mutex_unlock (&queue->mutex);
303 }
304