• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.volley;
18 
19 import android.os.Handler;
20 import android.os.Looper;
21 
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.LinkedList;
25 import java.util.Map;
26 import java.util.Queue;
27 import java.util.Set;
28 import java.util.concurrent.PriorityBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30 
31 /**
32  * A request dispatch queue with a thread pool of dispatchers.
33  *
34  * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
35  * resolving from either cache or network on a worker thread, and then delivering
36  * a parsed response on the main thread.
37  */
38 public class RequestQueue {
39 
40     /** Used for generating monotonically-increasing sequence numbers for requests. */
41     private AtomicInteger mSequenceGenerator = new AtomicInteger();
42 
43     /**
44      * Staging area for requests that already have a duplicate request in flight.
45      *
46      * <ul>
47      *     <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
48      *          key.</li>
49      *     <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
50      *          is <em>not</em> contained in that list. Is null if no requests are staged.</li>
51      * </ul>
52      */
53     private final Map<String, Queue<Request<?>>> mWaitingRequests =
54             new HashMap<String, Queue<Request<?>>>();
55 
56     /**
57      * The set of all requests currently being processed by this RequestQueue. A Request
58      * will be in this set if it is waiting in any queue or currently being processed by
59      * any dispatcher.
60      */
61     private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>();
62 
63     /** The cache triage queue. */
64     private final PriorityBlockingQueue<Request<?>> mCacheQueue =
65         new PriorityBlockingQueue<Request<?>>();
66 
67     /** The queue of requests that are actually going out to the network. */
68     private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
69         new PriorityBlockingQueue<Request<?>>();
70 
71     /** Number of network request dispatcher threads to start. */
72     private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
73 
74     /** Cache interface for retrieving and storing responses. */
75     private final Cache mCache;
76 
77     /** Network interface for performing requests. */
78     private final Network mNetwork;
79 
80     /** Response delivery mechanism. */
81     private final ResponseDelivery mDelivery;
82 
83     /** The network dispatchers. */
84     private NetworkDispatcher[] mDispatchers;
85 
86     /** The cache dispatcher. */
87     private CacheDispatcher mCacheDispatcher;
88 
89     /**
90      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
91      *
92      * @param cache A Cache to use for persisting responses to disk
93      * @param network A Network interface for performing HTTP requests
94      * @param threadPoolSize Number of network dispatcher threads to create
95      * @param delivery A ResponseDelivery interface for posting responses and errors
96      */
RequestQueue(Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery)97     public RequestQueue(Cache cache, Network network, int threadPoolSize,
98             ResponseDelivery delivery) {
99         mCache = cache;
100         mNetwork = network;
101         mDispatchers = new NetworkDispatcher[threadPoolSize];
102         mDelivery = delivery;
103     }
104 
105     /**
106      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
107      *
108      * @param cache A Cache to use for persisting responses to disk
109      * @param network A Network interface for performing HTTP requests
110      * @param threadPoolSize Number of network dispatcher threads to create
111      */
RequestQueue(Cache cache, Network network, int threadPoolSize)112     public RequestQueue(Cache cache, Network network, int threadPoolSize) {
113         this(cache, network, threadPoolSize,
114                 new ExecutorDelivery(new Handler(Looper.getMainLooper())));
115     }
116 
117     /**
118      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
119      *
120      * @param cache A Cache to use for persisting responses to disk
121      * @param network A Network interface for performing HTTP requests
122      */
RequestQueue(Cache cache, Network network)123     public RequestQueue(Cache cache, Network network) {
124         this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
125     }
126 
127     /**
128      * Starts the dispatchers in this queue.
129      */
start()130     public void start() {
131         stop();  // Make sure any currently running dispatchers are stopped.
132         // Create the cache dispatcher and start it.
133         mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
134         mCacheDispatcher.start();
135 
136         // Create network dispatchers (and corresponding threads) up to the pool size.
137         for (int i = 0; i < mDispatchers.length; i++) {
138             NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
139                     mCache, mDelivery);
140             mDispatchers[i] = networkDispatcher;
141             networkDispatcher.start();
142         }
143     }
144 
145     /**
146      * Stops the cache and network dispatchers.
147      */
stop()148     public void stop() {
149         if (mCacheDispatcher != null) {
150             mCacheDispatcher.quit();
151         }
152         for (int i = 0; i < mDispatchers.length; i++) {
153             if (mDispatchers[i] != null) {
154                 mDispatchers[i].quit();
155             }
156         }
157     }
158 
159     /**
160      * Gets a sequence number.
161      */
getSequenceNumber()162     public int getSequenceNumber() {
163         return mSequenceGenerator.incrementAndGet();
164     }
165 
166     /**
167      * Gets the {@link Cache} instance being used.
168      */
getCache()169     public Cache getCache() {
170         return mCache;
171     }
172 
173     /**
174      * A simple predicate or filter interface for Requests, for use by
175      * {@link RequestQueue#cancelAll(RequestFilter)}.
176      */
177     public interface RequestFilter {
apply(Request<?> request)178         public boolean apply(Request<?> request);
179     }
180 
181     /**
182      * Cancels all requests in this queue for which the given filter applies.
183      * @param filter The filtering function to use
184      */
cancelAll(RequestFilter filter)185     public void cancelAll(RequestFilter filter) {
186         synchronized (mCurrentRequests) {
187             for (Request<?> request : mCurrentRequests) {
188                 if (filter.apply(request)) {
189                     request.cancel();
190                 }
191             }
192         }
193     }
194 
195     /**
196      * Cancels all requests in this queue with the given tag. Tag must be non-null
197      * and equality is by identity.
198      */
cancelAll(final Object tag)199     public void cancelAll(final Object tag) {
200         if (tag == null) {
201             throw new IllegalArgumentException("Cannot cancelAll with a null tag");
202         }
203         cancelAll(new RequestFilter() {
204             @Override
205             public boolean apply(Request<?> request) {
206                 return request.getTag() == tag;
207             }
208         });
209     }
210 
211     /**
212      * Adds a Request to the dispatch queue.
213      * @param request The request to service
214      * @return The passed-in request
215      */
add(Request<T> request)216     public <T> Request<T> add(Request<T> request) {
217         // Tag the request as belonging to this queue and add it to the set of current requests.
218         request.setRequestQueue(this);
219         synchronized (mCurrentRequests) {
220             mCurrentRequests.add(request);
221         }
222 
223         // Process requests in the order they are added.
224         request.setSequence(getSequenceNumber());
225         request.addMarker("add-to-queue");
226 
227         // If the request is uncacheable, skip the cache queue and go straight to the network.
228         if (!request.shouldCache()) {
229             mNetworkQueue.add(request);
230             return request;
231         }
232 
233         // Insert request into stage if there's already a request with the same cache key in flight.
234         synchronized (mWaitingRequests) {
235             String cacheKey = request.getCacheKey();
236             if (mWaitingRequests.containsKey(cacheKey)) {
237                 // There is already a request in flight. Queue up.
238                 Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
239                 if (stagedRequests == null) {
240                     stagedRequests = new LinkedList<Request<?>>();
241                 }
242                 stagedRequests.add(request);
243                 mWaitingRequests.put(cacheKey, stagedRequests);
244                 if (VolleyLog.DEBUG) {
245                     VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
246                 }
247             } else {
248                 // Insert 'null' queue for this cacheKey, indicating there is now a request in
249                 // flight.
250                 mWaitingRequests.put(cacheKey, null);
251                 mCacheQueue.add(request);
252             }
253             return request;
254         }
255     }
256 
257     /**
258      * Called from {@link Request#finish(String)}, indicating that processing of the given request
259      * has finished.
260      *
261      * <p>Releases waiting requests for <code>request.getCacheKey()</code> if
262      *      <code>request.shouldCache()</code>.</p>
263      */
finish(Request<?> request)264     void finish(Request<?> request) {
265         // Remove from the set of requests currently being processed.
266         synchronized (mCurrentRequests) {
267             mCurrentRequests.remove(request);
268         }
269 
270         if (request.shouldCache()) {
271             synchronized (mWaitingRequests) {
272                 String cacheKey = request.getCacheKey();
273                 Queue<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey);
274                 if (waitingRequests != null) {
275                     if (VolleyLog.DEBUG) {
276                         VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
277                                 waitingRequests.size(), cacheKey);
278                     }
279                     // Process all queued up requests. They won't be considered as in flight, but
280                     // that's not a problem as the cache has been primed by 'request'.
281                     mCacheQueue.addAll(waitingRequests);
282                 }
283             }
284         }
285     }
286 }
287