• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.volley;
18 
19 import android.os.Process;
20 import androidx.annotation.VisibleForTesting;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.BlockingQueue;
26 
27 /**
28  * Provides a thread for performing cache triage on a queue of requests.
29  *
30  * <p>Requests added to the specified cache queue are resolved from cache. Any deliverable response
31  * is posted back to the caller via a {@link ResponseDelivery}. Cache misses and responses that
32  * require refresh are enqueued on the specified network queue for processing by a {@link
33  * NetworkDispatcher}.
34  */
35 public class CacheDispatcher extends Thread {
36 
37     private static final boolean DEBUG = VolleyLog.DEBUG;
38 
39     /** The queue of requests coming in for triage. */
40     private final BlockingQueue<Request<?>> mCacheQueue;
41 
42     /** The queue of requests going out to the network. */
43     private final BlockingQueue<Request<?>> mNetworkQueue;
44 
45     /** The cache to read from. */
46     private final Cache mCache;
47 
48     /** For posting responses. */
49     private final ResponseDelivery mDelivery;
50 
51     /** Used for telling us to die. */
52     private volatile boolean mQuit = false;
53 
54     /** Manage list of waiting requests and de-duplicate requests with same cache key. */
55     private final WaitingRequestManager mWaitingRequestManager;
56 
57     /**
58      * Creates a new cache triage dispatcher thread. You must call {@link #start()} in order to
59      * begin processing.
60      *
61      * @param cacheQueue Queue of incoming requests for triage
62      * @param networkQueue Queue to post requests that require network to
63      * @param cache Cache interface to use for resolution
64      * @param delivery Delivery interface to use for posting responses
65      */
CacheDispatcher( BlockingQueue<Request<?>> cacheQueue, BlockingQueue<Request<?>> networkQueue, Cache cache, ResponseDelivery delivery)66     public CacheDispatcher(
67             BlockingQueue<Request<?>> cacheQueue,
68             BlockingQueue<Request<?>> networkQueue,
69             Cache cache,
70             ResponseDelivery delivery) {
71         mCacheQueue = cacheQueue;
72         mNetworkQueue = networkQueue;
73         mCache = cache;
74         mDelivery = delivery;
75         mWaitingRequestManager = new WaitingRequestManager(this);
76     }
77 
78     /**
79      * Forces this dispatcher to quit immediately. If any requests are still in the queue, they are
80      * not guaranteed to be processed.
81      */
quit()82     public void quit() {
83         mQuit = true;
84         interrupt();
85     }
86 
87     @Override
run()88     public void run() {
89         if (DEBUG) VolleyLog.v("start new dispatcher");
90         Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
91 
92         // Make a blocking call to initialize the cache.
93         mCache.initialize();
94 
95         while (true) {
96             try {
97                 processRequest();
98             } catch (InterruptedException e) {
99                 // We may have been interrupted because it was time to quit.
100                 if (mQuit) {
101                     Thread.currentThread().interrupt();
102                     return;
103                 }
104                 VolleyLog.e(
105                         "Ignoring spurious interrupt of CacheDispatcher thread; "
106                                 + "use quit() to terminate it");
107             }
108         }
109     }
110 
111     // Extracted to its own method to ensure locals have a constrained liveness scope by the GC.
112     // This is needed to avoid keeping previous request references alive for an indeterminate amount
113     // of time. Update consumer-proguard-rules.pro when modifying this. See also
114     // https://github.com/google/volley/issues/114
processRequest()115     private void processRequest() throws InterruptedException {
116         // Get a request from the cache triage queue, blocking until
117         // at least one is available.
118         final Request<?> request = mCacheQueue.take();
119         processRequest(request);
120     }
121 
122     @VisibleForTesting
processRequest(final Request<?> request)123     void processRequest(final Request<?> request) throws InterruptedException {
124         request.addMarker("cache-queue-take");
125         request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_STARTED);
126 
127         try {
128             // If the request has been canceled, don't bother dispatching it.
129             if (request.isCanceled()) {
130                 request.finish("cache-discard-canceled");
131                 return;
132             }
133 
134             // Attempt to retrieve this item from cache.
135             Cache.Entry entry = mCache.get(request.getCacheKey());
136             if (entry == null) {
137                 request.addMarker("cache-miss");
138                 // Cache miss; send off to the network dispatcher.
139                 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
140                     mNetworkQueue.put(request);
141                 }
142                 return;
143             }
144 
145             // If it is completely expired, just send it to the network.
146             if (entry.isExpired()) {
147                 request.addMarker("cache-hit-expired");
148                 request.setCacheEntry(entry);
149                 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
150                     mNetworkQueue.put(request);
151                 }
152                 return;
153             }
154 
155             // We have a cache hit; parse its data for delivery back to the request.
156             request.addMarker("cache-hit");
157             Response<?> response =
158                     request.parseNetworkResponse(
159                             new NetworkResponse(entry.data, entry.responseHeaders));
160             request.addMarker("cache-hit-parsed");
161 
162             if (!entry.refreshNeeded()) {
163                 // Completely unexpired cache hit. Just deliver the response.
164                 mDelivery.postResponse(request, response);
165             } else {
166                 // Soft-expired cache hit. We can deliver the cached response,
167                 // but we need to also send the request to the network for
168                 // refreshing.
169                 request.addMarker("cache-hit-refresh-needed");
170                 request.setCacheEntry(entry);
171                 // Mark the response as intermediate.
172                 response.intermediate = true;
173 
174                 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
175                     // Post the intermediate response back to the user and have
176                     // the delivery then forward the request along to the network.
177                     mDelivery.postResponse(
178                             request,
179                             response,
180                             new Runnable() {
181                                 @Override
182                                 public void run() {
183                                     try {
184                                         mNetworkQueue.put(request);
185                                     } catch (InterruptedException e) {
186                                         // Restore the interrupted status
187                                         Thread.currentThread().interrupt();
188                                     }
189                                 }
190                             });
191                 } else {
192                     // request has been added to list of waiting requests
193                     // to receive the network response from the first request once it returns.
194                     mDelivery.postResponse(request, response);
195                 }
196             }
197         } finally {
198             request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED);
199         }
200     }
201 
202     private static class WaitingRequestManager implements Request.NetworkRequestCompleteListener {
203 
204         /**
205          * Staging area for requests that already have a duplicate request in flight.
206          *
207          * <ul>
208          *   <li>containsKey(cacheKey) indicates that there is a request in flight for the given
209          *       cache key.
210          *   <li>get(cacheKey) returns waiting requests for the given cache key. The in flight
211          *       request is <em>not</em> contained in that list. Is null if no requests are staged.
212          * </ul>
213          */
214         private final Map<String, List<Request<?>>> mWaitingRequests = new HashMap<>();
215 
216         private final CacheDispatcher mCacheDispatcher;
217 
WaitingRequestManager(CacheDispatcher cacheDispatcher)218         WaitingRequestManager(CacheDispatcher cacheDispatcher) {
219             mCacheDispatcher = cacheDispatcher;
220         }
221 
222         /** Request received a valid response that can be used by other waiting requests. */
223         @Override
onResponseReceived(Request<?> request, Response<?> response)224         public void onResponseReceived(Request<?> request, Response<?> response) {
225             if (response.cacheEntry == null || response.cacheEntry.isExpired()) {
226                 onNoUsableResponseReceived(request);
227                 return;
228             }
229             String cacheKey = request.getCacheKey();
230             List<Request<?>> waitingRequests;
231             synchronized (this) {
232                 waitingRequests = mWaitingRequests.remove(cacheKey);
233             }
234             if (waitingRequests != null) {
235                 if (VolleyLog.DEBUG) {
236                     VolleyLog.v(
237                             "Releasing %d waiting requests for cacheKey=%s.",
238                             waitingRequests.size(), cacheKey);
239                 }
240                 // Process all queued up requests.
241                 for (Request<?> waiting : waitingRequests) {
242                     mCacheDispatcher.mDelivery.postResponse(waiting, response);
243                 }
244             }
245         }
246 
247         /** No valid response received from network, release waiting requests. */
248         @Override
onNoUsableResponseReceived(Request<?> request)249         public synchronized void onNoUsableResponseReceived(Request<?> request) {
250             String cacheKey = request.getCacheKey();
251             List<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey);
252             if (waitingRequests != null && !waitingRequests.isEmpty()) {
253                 if (VolleyLog.DEBUG) {
254                     VolleyLog.v(
255                             "%d waiting requests for cacheKey=%s; resend to network",
256                             waitingRequests.size(), cacheKey);
257                 }
258                 Request<?> nextInLine = waitingRequests.remove(0);
259                 mWaitingRequests.put(cacheKey, waitingRequests);
260                 nextInLine.setNetworkRequestCompleteListener(this);
261                 try {
262                     mCacheDispatcher.mNetworkQueue.put(nextInLine);
263                 } catch (InterruptedException iex) {
264                     VolleyLog.e("Couldn't add request to queue. %s", iex.toString());
265                     // Restore the interrupted status of the calling thread (i.e. NetworkDispatcher)
266                     Thread.currentThread().interrupt();
267                     // Quit the current CacheDispatcher thread.
268                     mCacheDispatcher.quit();
269                 }
270             }
271         }
272 
273         /**
274          * For cacheable requests, if a request for the same cache key is already in flight, add it
275          * to a queue to wait for that in-flight request to finish.
276          *
277          * @return whether the request was queued. If false, we should continue issuing the request
278          *     over the network. If true, we should put the request on hold to be processed when the
279          *     in-flight request finishes.
280          */
maybeAddToWaitingRequests(Request<?> request)281         private synchronized boolean maybeAddToWaitingRequests(Request<?> request) {
282             String cacheKey = request.getCacheKey();
283             // Insert request into stage if there's already a request with the same cache key
284             // in flight.
285             if (mWaitingRequests.containsKey(cacheKey)) {
286                 // There is already a request in flight. Queue up.
287                 List<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
288                 if (stagedRequests == null) {
289                     stagedRequests = new ArrayList<>();
290                 }
291                 request.addMarker("waiting-for-response");
292                 stagedRequests.add(request);
293                 mWaitingRequests.put(cacheKey, stagedRequests);
294                 if (VolleyLog.DEBUG) {
295                     VolleyLog.d("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
296                 }
297                 return true;
298             } else {
299                 // Insert 'null' queue for this cacheKey, indicating there is now a request in
300                 // flight.
301                 mWaitingRequests.put(cacheKey, null);
302                 request.setNetworkRequestCompleteListener(this);
303                 if (VolleyLog.DEBUG) {
304                     VolleyLog.d("new request, sending to network %s", cacheKey);
305                 }
306                 return false;
307             }
308         }
309     }
310 }
311