• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.volley;
18 
19 import android.os.Handler;
20 import android.os.Looper;
21 
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.PriorityBlockingQueue;
27 import java.util.concurrent.atomic.AtomicInteger;
28 
29 /**
30  * A request dispatch queue with a thread pool of dispatchers.
31  *
32  * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
33  * resolving from either cache or network on a worker thread, and then delivering
34  * a parsed response on the main thread.
35  */
36 public class RequestQueue {
37 
38     /** Callback interface for completed requests. */
39     public interface RequestFinishedListener<T> {
40         /** Called when a request has finished processing. */
onRequestFinished(Request<T> request)41         void onRequestFinished(Request<T> request);
42     }
43 
44     /** Used for generating monotonically-increasing sequence numbers for requests. */
45     private final AtomicInteger mSequenceGenerator = new AtomicInteger();
46 
47     /**
48      * The set of all requests currently being processed by this RequestQueue. A Request
49      * will be in this set if it is waiting in any queue or currently being processed by
50      * any dispatcher.
51      */
52     private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>();
53 
54     /** The cache triage queue. */
55     private final PriorityBlockingQueue<Request<?>> mCacheQueue =
56             new PriorityBlockingQueue<>();
57 
58     /** The queue of requests that are actually going out to the network. */
59     private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
60             new PriorityBlockingQueue<>();
61 
62     /** Number of network request dispatcher threads to start. */
63     private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
64 
65     /** Cache interface for retrieving and storing responses. */
66     private final Cache mCache;
67 
68     /** Network interface for performing requests. */
69     private final Network mNetwork;
70 
71     /** Response delivery mechanism. */
72     private final ResponseDelivery mDelivery;
73 
74     /** The network dispatchers. */
75     private final NetworkDispatcher[] mDispatchers;
76 
77     /** The cache dispatcher. */
78     private CacheDispatcher mCacheDispatcher;
79 
80     private final List<RequestFinishedListener> mFinishedListeners =
81             new ArrayList<>();
82 
83     /**
84      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
85      *
86      * @param cache A Cache to use for persisting responses to disk
87      * @param network A Network interface for performing HTTP requests
88      * @param threadPoolSize Number of network dispatcher threads to create
89      * @param delivery A ResponseDelivery interface for posting responses and errors
90      */
RequestQueue(Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery)91     public RequestQueue(Cache cache, Network network, int threadPoolSize,
92             ResponseDelivery delivery) {
93         mCache = cache;
94         mNetwork = network;
95         mDispatchers = new NetworkDispatcher[threadPoolSize];
96         mDelivery = delivery;
97     }
98 
99     /**
100      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
101      *
102      * @param cache A Cache to use for persisting responses to disk
103      * @param network A Network interface for performing HTTP requests
104      * @param threadPoolSize Number of network dispatcher threads to create
105      */
RequestQueue(Cache cache, Network network, int threadPoolSize)106     public RequestQueue(Cache cache, Network network, int threadPoolSize) {
107         this(cache, network, threadPoolSize,
108                 new ExecutorDelivery(new Handler(Looper.getMainLooper())));
109     }
110 
111     /**
112      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
113      *
114      * @param cache A Cache to use for persisting responses to disk
115      * @param network A Network interface for performing HTTP requests
116      */
RequestQueue(Cache cache, Network network)117     public RequestQueue(Cache cache, Network network) {
118         this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
119     }
120 
121     /**
122      * Starts the dispatchers in this queue.
123      */
start()124     public void start() {
125         stop();  // Make sure any currently running dispatchers are stopped.
126         // Create the cache dispatcher and start it.
127         mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
128         mCacheDispatcher.start();
129 
130         // Create network dispatchers (and corresponding threads) up to the pool size.
131         for (int i = 0; i < mDispatchers.length; i++) {
132             NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
133                     mCache, mDelivery);
134             mDispatchers[i] = networkDispatcher;
135             networkDispatcher.start();
136         }
137     }
138 
139     /**
140      * Stops the cache and network dispatchers.
141      */
stop()142     public void stop() {
143         if (mCacheDispatcher != null) {
144             mCacheDispatcher.quit();
145         }
146         for (final NetworkDispatcher mDispatcher : mDispatchers) {
147             if (mDispatcher != null) {
148                 mDispatcher.quit();
149             }
150         }
151     }
152 
153     /**
154      * Gets a sequence number.
155      */
getSequenceNumber()156     public int getSequenceNumber() {
157         return mSequenceGenerator.incrementAndGet();
158     }
159 
160     /**
161      * Gets the {@link Cache} instance being used.
162      */
getCache()163     public Cache getCache() {
164         return mCache;
165     }
166 
167     /**
168      * A simple predicate or filter interface for Requests, for use by
169      * {@link RequestQueue#cancelAll(RequestFilter)}.
170      */
171     public interface RequestFilter {
apply(Request<?> request)172         boolean apply(Request<?> request);
173     }
174 
175     /**
176      * Cancels all requests in this queue for which the given filter applies.
177      * @param filter The filtering function to use
178      */
cancelAll(RequestFilter filter)179     public void cancelAll(RequestFilter filter) {
180         synchronized (mCurrentRequests) {
181             for (Request<?> request : mCurrentRequests) {
182                 if (filter.apply(request)) {
183                     request.cancel();
184                 }
185             }
186         }
187     }
188 
189     /**
190      * Cancels all requests in this queue with the given tag. Tag must be non-null
191      * and equality is by identity.
192      */
cancelAll(final Object tag)193     public void cancelAll(final Object tag) {
194         if (tag == null) {
195             throw new IllegalArgumentException("Cannot cancelAll with a null tag");
196         }
197         cancelAll(new RequestFilter() {
198             @Override
199             public boolean apply(Request<?> request) {
200                 return request.getTag() == tag;
201             }
202         });
203     }
204 
205     /**
206      * Adds a Request to the dispatch queue.
207      * @param request The request to service
208      * @return The passed-in request
209      */
add(Request<T> request)210     public <T> Request<T> add(Request<T> request) {
211         // Tag the request as belonging to this queue and add it to the set of current requests.
212         request.setRequestQueue(this);
213         synchronized (mCurrentRequests) {
214             mCurrentRequests.add(request);
215         }
216 
217         // Process requests in the order they are added.
218         request.setSequence(getSequenceNumber());
219         request.addMarker("add-to-queue");
220 
221         // If the request is uncacheable, skip the cache queue and go straight to the network.
222         if (!request.shouldCache()) {
223             mNetworkQueue.add(request);
224             return request;
225         }
226         mCacheQueue.add(request);
227         return request;
228      }
229 
230     /**
231      * Called from {@link Request#finish(String)}, indicating that processing of the given request
232      * has finished.
233      */
finish(Request<T> request)234     <T> void finish(Request<T> request) {
235         // Remove from the set of requests currently being processed.
236         synchronized (mCurrentRequests) {
237             mCurrentRequests.remove(request);
238         }
239         synchronized (mFinishedListeners) {
240           for (RequestFinishedListener<T> listener : mFinishedListeners) {
241             listener.onRequestFinished(request);
242           }
243         }
244 
245     }
246 
addRequestFinishedListener(RequestFinishedListener<T> listener)247     public  <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) {
248       synchronized (mFinishedListeners) {
249         mFinishedListeners.add(listener);
250       }
251     }
252 
253     /**
254      * Remove a RequestFinishedListener. Has no effect if listener was not previously added.
255      */
removeRequestFinishedListener(RequestFinishedListener<T> listener)256     public  <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) {
257       synchronized (mFinishedListeners) {
258         mFinishedListeners.remove(listener);
259       }
260     }
261 }
262