1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.volley; 18 19 import android.os.Handler; 20 import android.os.Looper; 21 22 import java.util.HashMap; 23 import java.util.HashSet; 24 import java.util.LinkedList; 25 import java.util.Map; 26 import java.util.Queue; 27 import java.util.Set; 28 import java.util.concurrent.PriorityBlockingQueue; 29 import java.util.concurrent.atomic.AtomicInteger; 30 31 /** 32 * A request dispatch queue with a thread pool of dispatchers. 33 * 34 * Calling {@link #add(Request)} will enqueue the given Request for dispatch, 35 * resolving from either cache or network on a worker thread, and then delivering 36 * a parsed response on the main thread. 37 */ 38 public class RequestQueue { 39 40 /** Used for generating monotonically-increasing sequence numbers for requests. */ 41 private AtomicInteger mSequenceGenerator = new AtomicInteger(); 42 43 /** 44 * Staging area for requests that already have a duplicate request in flight. 45 * 46 * <ul> 47 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache 48 * key.</li> 49 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request 50 * is <em>not</em> contained in that list. Is null if no requests are staged.</li> 51 * </ul> 52 */ 53 private final Map<String, Queue<Request<?>>> mWaitingRequests = 54 new HashMap<String, Queue<Request<?>>>(); 55 56 /** 57 * The set of all requests currently being processed by this RequestQueue. A Request 58 * will be in this set if it is waiting in any queue or currently being processed by 59 * any dispatcher. 60 */ 61 private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>(); 62 63 /** The cache triage queue. */ 64 private final PriorityBlockingQueue<Request<?>> mCacheQueue = 65 new PriorityBlockingQueue<Request<?>>(); 66 67 /** The queue of requests that are actually going out to the network. */ 68 private final PriorityBlockingQueue<Request<?>> mNetworkQueue = 69 new PriorityBlockingQueue<Request<?>>(); 70 71 /** Number of network request dispatcher threads to start. */ 72 private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4; 73 74 /** Cache interface for retrieving and storing responses. */ 75 private final Cache mCache; 76 77 /** Network interface for performing requests. */ 78 private final Network mNetwork; 79 80 /** Response delivery mechanism. */ 81 private final ResponseDelivery mDelivery; 82 83 /** The network dispatchers. */ 84 private NetworkDispatcher[] mDispatchers; 85 86 /** The cache dispatcher. */ 87 private CacheDispatcher mCacheDispatcher; 88 89 /** 90 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 91 * 92 * @param cache A Cache to use for persisting responses to disk 93 * @param network A Network interface for performing HTTP requests 94 * @param threadPoolSize Number of network dispatcher threads to create 95 * @param delivery A ResponseDelivery interface for posting responses and errors 96 */ RequestQueue(Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery)97 public RequestQueue(Cache cache, Network network, int threadPoolSize, 98 ResponseDelivery delivery) { 99 mCache = cache; 100 mNetwork = network; 101 mDispatchers = new NetworkDispatcher[threadPoolSize]; 102 mDelivery = delivery; 103 } 104 105 /** 106 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 107 * 108 * @param cache A Cache to use for persisting responses to disk 109 * @param network A Network interface for performing HTTP requests 110 * @param threadPoolSize Number of network dispatcher threads to create 111 */ RequestQueue(Cache cache, Network network, int threadPoolSize)112 public RequestQueue(Cache cache, Network network, int threadPoolSize) { 113 this(cache, network, threadPoolSize, 114 new ExecutorDelivery(new Handler(Looper.getMainLooper()))); 115 } 116 117 /** 118 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 119 * 120 * @param cache A Cache to use for persisting responses to disk 121 * @param network A Network interface for performing HTTP requests 122 */ RequestQueue(Cache cache, Network network)123 public RequestQueue(Cache cache, Network network) { 124 this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE); 125 } 126 127 /** 128 * Starts the dispatchers in this queue. 129 */ start()130 public void start() { 131 stop(); // Make sure any currently running dispatchers are stopped. 132 // Create the cache dispatcher and start it. 133 mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); 134 mCacheDispatcher.start(); 135 136 // Create network dispatchers (and corresponding threads) up to the pool size. 137 for (int i = 0; i < mDispatchers.length; i++) { 138 NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, 139 mCache, mDelivery); 140 mDispatchers[i] = networkDispatcher; 141 networkDispatcher.start(); 142 } 143 } 144 145 /** 146 * Stops the cache and network dispatchers. 147 */ stop()148 public void stop() { 149 if (mCacheDispatcher != null) { 150 mCacheDispatcher.quit(); 151 } 152 for (int i = 0; i < mDispatchers.length; i++) { 153 if (mDispatchers[i] != null) { 154 mDispatchers[i].quit(); 155 } 156 } 157 } 158 159 /** 160 * Gets a sequence number. 161 */ getSequenceNumber()162 public int getSequenceNumber() { 163 return mSequenceGenerator.incrementAndGet(); 164 } 165 166 /** 167 * Gets the {@link Cache} instance being used. 168 */ getCache()169 public Cache getCache() { 170 return mCache; 171 } 172 173 /** 174 * A simple predicate or filter interface for Requests, for use by 175 * {@link RequestQueue#cancelAll(RequestFilter)}. 176 */ 177 public interface RequestFilter { apply(Request<?> request)178 public boolean apply(Request<?> request); 179 } 180 181 /** 182 * Cancels all requests in this queue for which the given filter applies. 183 * @param filter The filtering function to use 184 */ cancelAll(RequestFilter filter)185 public void cancelAll(RequestFilter filter) { 186 synchronized (mCurrentRequests) { 187 for (Request<?> request : mCurrentRequests) { 188 if (filter.apply(request)) { 189 request.cancel(); 190 } 191 } 192 } 193 } 194 195 /** 196 * Cancels all requests in this queue with the given tag. Tag must be non-null 197 * and equality is by identity. 198 */ cancelAll(final Object tag)199 public void cancelAll(final Object tag) { 200 if (tag == null) { 201 throw new IllegalArgumentException("Cannot cancelAll with a null tag"); 202 } 203 cancelAll(new RequestFilter() { 204 @Override 205 public boolean apply(Request<?> request) { 206 return request.getTag() == tag; 207 } 208 }); 209 } 210 211 /** 212 * Adds a Request to the dispatch queue. 213 * @param request The request to service 214 * @return The passed-in request 215 */ add(Request<T> request)216 public <T> Request<T> add(Request<T> request) { 217 // Tag the request as belonging to this queue and add it to the set of current requests. 218 request.setRequestQueue(this); 219 synchronized (mCurrentRequests) { 220 mCurrentRequests.add(request); 221 } 222 223 // Process requests in the order they are added. 224 request.setSequence(getSequenceNumber()); 225 request.addMarker("add-to-queue"); 226 227 // If the request is uncacheable, skip the cache queue and go straight to the network. 228 if (!request.shouldCache()) { 229 mNetworkQueue.add(request); 230 return request; 231 } 232 233 // Insert request into stage if there's already a request with the same cache key in flight. 234 synchronized (mWaitingRequests) { 235 String cacheKey = request.getCacheKey(); 236 if (mWaitingRequests.containsKey(cacheKey)) { 237 // There is already a request in flight. Queue up. 238 Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey); 239 if (stagedRequests == null) { 240 stagedRequests = new LinkedList<Request<?>>(); 241 } 242 stagedRequests.add(request); 243 mWaitingRequests.put(cacheKey, stagedRequests); 244 if (VolleyLog.DEBUG) { 245 VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); 246 } 247 } else { 248 // Insert 'null' queue for this cacheKey, indicating there is now a request in 249 // flight. 250 mWaitingRequests.put(cacheKey, null); 251 mCacheQueue.add(request); 252 } 253 return request; 254 } 255 } 256 257 /** 258 * Called from {@link Request#finish(String)}, indicating that processing of the given request 259 * has finished. 260 * 261 * <p>Releases waiting requests for <code>request.getCacheKey()</code> if 262 * <code>request.shouldCache()</code>.</p> 263 */ finish(Request<?> request)264 void finish(Request<?> request) { 265 // Remove from the set of requests currently being processed. 266 synchronized (mCurrentRequests) { 267 mCurrentRequests.remove(request); 268 } 269 270 if (request.shouldCache()) { 271 synchronized (mWaitingRequests) { 272 String cacheKey = request.getCacheKey(); 273 Queue<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey); 274 if (waitingRequests != null) { 275 if (VolleyLog.DEBUG) { 276 VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.", 277 waitingRequests.size(), cacheKey); 278 } 279 // Process all queued up requests. They won't be considered as in flight, but 280 // that's not a problem as the cache has been primed by 'request'. 281 mCacheQueue.addAll(waitingRequests); 282 } 283 } 284 } 285 } 286 } 287