1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.volley; 18 19 import android.os.Handler; 20 import android.os.Looper; 21 22 import java.util.ArrayList; 23 import java.util.HashSet; 24 import java.util.List; 25 import java.util.Set; 26 import java.util.concurrent.PriorityBlockingQueue; 27 import java.util.concurrent.atomic.AtomicInteger; 28 29 /** 30 * A request dispatch queue with a thread pool of dispatchers. 31 * 32 * Calling {@link #add(Request)} will enqueue the given Request for dispatch, 33 * resolving from either cache or network on a worker thread, and then delivering 34 * a parsed response on the main thread. 35 */ 36 public class RequestQueue { 37 38 /** Callback interface for completed requests. */ 39 public interface RequestFinishedListener<T> { 40 /** Called when a request has finished processing. */ onRequestFinished(Request<T> request)41 void onRequestFinished(Request<T> request); 42 } 43 44 /** Used for generating monotonically-increasing sequence numbers for requests. */ 45 private final AtomicInteger mSequenceGenerator = new AtomicInteger(); 46 47 /** 48 * The set of all requests currently being processed by this RequestQueue. A Request 49 * will be in this set if it is waiting in any queue or currently being processed by 50 * any dispatcher. 51 */ 52 private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>(); 53 54 /** The cache triage queue. */ 55 private final PriorityBlockingQueue<Request<?>> mCacheQueue = 56 new PriorityBlockingQueue<>(); 57 58 /** The queue of requests that are actually going out to the network. */ 59 private final PriorityBlockingQueue<Request<?>> mNetworkQueue = 60 new PriorityBlockingQueue<>(); 61 62 /** Number of network request dispatcher threads to start. */ 63 private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4; 64 65 /** Cache interface for retrieving and storing responses. */ 66 private final Cache mCache; 67 68 /** Network interface for performing requests. */ 69 private final Network mNetwork; 70 71 /** Response delivery mechanism. */ 72 private final ResponseDelivery mDelivery; 73 74 /** The network dispatchers. */ 75 private final NetworkDispatcher[] mDispatchers; 76 77 /** The cache dispatcher. */ 78 private CacheDispatcher mCacheDispatcher; 79 80 private final List<RequestFinishedListener> mFinishedListeners = 81 new ArrayList<>(); 82 83 /** 84 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 85 * 86 * @param cache A Cache to use for persisting responses to disk 87 * @param network A Network interface for performing HTTP requests 88 * @param threadPoolSize Number of network dispatcher threads to create 89 * @param delivery A ResponseDelivery interface for posting responses and errors 90 */ RequestQueue(Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery)91 public RequestQueue(Cache cache, Network network, int threadPoolSize, 92 ResponseDelivery delivery) { 93 mCache = cache; 94 mNetwork = network; 95 mDispatchers = new NetworkDispatcher[threadPoolSize]; 96 mDelivery = delivery; 97 } 98 99 /** 100 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 101 * 102 * @param cache A Cache to use for persisting responses to disk 103 * @param network A Network interface for performing HTTP requests 104 * @param threadPoolSize Number of network dispatcher threads to create 105 */ RequestQueue(Cache cache, Network network, int threadPoolSize)106 public RequestQueue(Cache cache, Network network, int threadPoolSize) { 107 this(cache, network, threadPoolSize, 108 new ExecutorDelivery(new Handler(Looper.getMainLooper()))); 109 } 110 111 /** 112 * Creates the worker pool. Processing will not begin until {@link #start()} is called. 113 * 114 * @param cache A Cache to use for persisting responses to disk 115 * @param network A Network interface for performing HTTP requests 116 */ RequestQueue(Cache cache, Network network)117 public RequestQueue(Cache cache, Network network) { 118 this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE); 119 } 120 121 /** 122 * Starts the dispatchers in this queue. 123 */ start()124 public void start() { 125 stop(); // Make sure any currently running dispatchers are stopped. 126 // Create the cache dispatcher and start it. 127 mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); 128 mCacheDispatcher.start(); 129 130 // Create network dispatchers (and corresponding threads) up to the pool size. 131 for (int i = 0; i < mDispatchers.length; i++) { 132 NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, 133 mCache, mDelivery); 134 mDispatchers[i] = networkDispatcher; 135 networkDispatcher.start(); 136 } 137 } 138 139 /** 140 * Stops the cache and network dispatchers. 141 */ stop()142 public void stop() { 143 if (mCacheDispatcher != null) { 144 mCacheDispatcher.quit(); 145 } 146 for (final NetworkDispatcher mDispatcher : mDispatchers) { 147 if (mDispatcher != null) { 148 mDispatcher.quit(); 149 } 150 } 151 } 152 153 /** 154 * Gets a sequence number. 155 */ getSequenceNumber()156 public int getSequenceNumber() { 157 return mSequenceGenerator.incrementAndGet(); 158 } 159 160 /** 161 * Gets the {@link Cache} instance being used. 162 */ getCache()163 public Cache getCache() { 164 return mCache; 165 } 166 167 /** 168 * A simple predicate or filter interface for Requests, for use by 169 * {@link RequestQueue#cancelAll(RequestFilter)}. 170 */ 171 public interface RequestFilter { apply(Request<?> request)172 boolean apply(Request<?> request); 173 } 174 175 /** 176 * Cancels all requests in this queue for which the given filter applies. 177 * @param filter The filtering function to use 178 */ cancelAll(RequestFilter filter)179 public void cancelAll(RequestFilter filter) { 180 synchronized (mCurrentRequests) { 181 for (Request<?> request : mCurrentRequests) { 182 if (filter.apply(request)) { 183 request.cancel(); 184 } 185 } 186 } 187 } 188 189 /** 190 * Cancels all requests in this queue with the given tag. Tag must be non-null 191 * and equality is by identity. 192 */ cancelAll(final Object tag)193 public void cancelAll(final Object tag) { 194 if (tag == null) { 195 throw new IllegalArgumentException("Cannot cancelAll with a null tag"); 196 } 197 cancelAll(new RequestFilter() { 198 @Override 199 public boolean apply(Request<?> request) { 200 return request.getTag() == tag; 201 } 202 }); 203 } 204 205 /** 206 * Adds a Request to the dispatch queue. 207 * @param request The request to service 208 * @return The passed-in request 209 */ add(Request<T> request)210 public <T> Request<T> add(Request<T> request) { 211 // Tag the request as belonging to this queue and add it to the set of current requests. 212 request.setRequestQueue(this); 213 synchronized (mCurrentRequests) { 214 mCurrentRequests.add(request); 215 } 216 217 // Process requests in the order they are added. 218 request.setSequence(getSequenceNumber()); 219 request.addMarker("add-to-queue"); 220 221 // If the request is uncacheable, skip the cache queue and go straight to the network. 222 if (!request.shouldCache()) { 223 mNetworkQueue.add(request); 224 return request; 225 } 226 mCacheQueue.add(request); 227 return request; 228 } 229 230 /** 231 * Called from {@link Request#finish(String)}, indicating that processing of the given request 232 * has finished. 233 */ finish(Request<T> request)234 <T> void finish(Request<T> request) { 235 // Remove from the set of requests currently being processed. 236 synchronized (mCurrentRequests) { 237 mCurrentRequests.remove(request); 238 } 239 synchronized (mFinishedListeners) { 240 for (RequestFinishedListener<T> listener : mFinishedListeners) { 241 listener.onRequestFinished(request); 242 } 243 } 244 245 } 246 addRequestFinishedListener(RequestFinishedListener<T> listener)247 public <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) { 248 synchronized (mFinishedListeners) { 249 mFinishedListeners.add(listener); 250 } 251 } 252 253 /** 254 * Remove a RequestFinishedListener. Has no effect if listener was not previously added. 255 */ removeRequestFinishedListener(RequestFinishedListener<T> listener)256 public <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) { 257 synchronized (mFinishedListeners) { 258 mFinishedListeners.remove(listener); 259 } 260 } 261 } 262