1 /* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.volley; 18 19 import android.os.Process; 20 import androidx.annotation.VisibleForTesting; 21 import java.util.ArrayList; 22 import java.util.HashMap; 23 import java.util.List; 24 import java.util.Map; 25 import java.util.concurrent.BlockingQueue; 26 27 /** 28 * Provides a thread for performing cache triage on a queue of requests. 29 * 30 * <p>Requests added to the specified cache queue are resolved from cache. Any deliverable response 31 * is posted back to the caller via a {@link ResponseDelivery}. Cache misses and responses that 32 * require refresh are enqueued on the specified network queue for processing by a {@link 33 * NetworkDispatcher}. 34 */ 35 public class CacheDispatcher extends Thread { 36 37 private static final boolean DEBUG = VolleyLog.DEBUG; 38 39 /** The queue of requests coming in for triage. */ 40 private final BlockingQueue<Request<?>> mCacheQueue; 41 42 /** The queue of requests going out to the network. */ 43 private final BlockingQueue<Request<?>> mNetworkQueue; 44 45 /** The cache to read from. */ 46 private final Cache mCache; 47 48 /** For posting responses. */ 49 private final ResponseDelivery mDelivery; 50 51 /** Used for telling us to die. */ 52 private volatile boolean mQuit = false; 53 54 /** Manage list of waiting requests and de-duplicate requests with same cache key. */ 55 private final WaitingRequestManager mWaitingRequestManager; 56 57 /** 58 * Creates a new cache triage dispatcher thread. You must call {@link #start()} in order to 59 * begin processing. 60 * 61 * @param cacheQueue Queue of incoming requests for triage 62 * @param networkQueue Queue to post requests that require network to 63 * @param cache Cache interface to use for resolution 64 * @param delivery Delivery interface to use for posting responses 65 */ CacheDispatcher( BlockingQueue<Request<?>> cacheQueue, BlockingQueue<Request<?>> networkQueue, Cache cache, ResponseDelivery delivery)66 public CacheDispatcher( 67 BlockingQueue<Request<?>> cacheQueue, 68 BlockingQueue<Request<?>> networkQueue, 69 Cache cache, 70 ResponseDelivery delivery) { 71 mCacheQueue = cacheQueue; 72 mNetworkQueue = networkQueue; 73 mCache = cache; 74 mDelivery = delivery; 75 mWaitingRequestManager = new WaitingRequestManager(this); 76 } 77 78 /** 79 * Forces this dispatcher to quit immediately. If any requests are still in the queue, they are 80 * not guaranteed to be processed. 81 */ quit()82 public void quit() { 83 mQuit = true; 84 interrupt(); 85 } 86 87 @Override run()88 public void run() { 89 if (DEBUG) VolleyLog.v("start new dispatcher"); 90 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); 91 92 // Make a blocking call to initialize the cache. 93 mCache.initialize(); 94 95 while (true) { 96 try { 97 processRequest(); 98 } catch (InterruptedException e) { 99 // We may have been interrupted because it was time to quit. 100 if (mQuit) { 101 Thread.currentThread().interrupt(); 102 return; 103 } 104 VolleyLog.e( 105 "Ignoring spurious interrupt of CacheDispatcher thread; " 106 + "use quit() to terminate it"); 107 } 108 } 109 } 110 111 // Extracted to its own method to ensure locals have a constrained liveness scope by the GC. 112 // This is needed to avoid keeping previous request references alive for an indeterminate amount 113 // of time. Update consumer-proguard-rules.pro when modifying this. See also 114 // https://github.com/google/volley/issues/114 processRequest()115 private void processRequest() throws InterruptedException { 116 // Get a request from the cache triage queue, blocking until 117 // at least one is available. 118 final Request<?> request = mCacheQueue.take(); 119 processRequest(request); 120 } 121 122 @VisibleForTesting processRequest(final Request<?> request)123 void processRequest(final Request<?> request) throws InterruptedException { 124 request.addMarker("cache-queue-take"); 125 request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_STARTED); 126 127 try { 128 // If the request has been canceled, don't bother dispatching it. 129 if (request.isCanceled()) { 130 request.finish("cache-discard-canceled"); 131 return; 132 } 133 134 // Attempt to retrieve this item from cache. 135 Cache.Entry entry = mCache.get(request.getCacheKey()); 136 if (entry == null) { 137 request.addMarker("cache-miss"); 138 // Cache miss; send off to the network dispatcher. 139 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { 140 mNetworkQueue.put(request); 141 } 142 return; 143 } 144 145 // If it is completely expired, just send it to the network. 146 if (entry.isExpired()) { 147 request.addMarker("cache-hit-expired"); 148 request.setCacheEntry(entry); 149 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { 150 mNetworkQueue.put(request); 151 } 152 return; 153 } 154 155 // We have a cache hit; parse its data for delivery back to the request. 156 request.addMarker("cache-hit"); 157 Response<?> response = 158 request.parseNetworkResponse( 159 new NetworkResponse(entry.data, entry.responseHeaders)); 160 request.addMarker("cache-hit-parsed"); 161 162 if (!entry.refreshNeeded()) { 163 // Completely unexpired cache hit. Just deliver the response. 164 mDelivery.postResponse(request, response); 165 } else { 166 // Soft-expired cache hit. We can deliver the cached response, 167 // but we need to also send the request to the network for 168 // refreshing. 169 request.addMarker("cache-hit-refresh-needed"); 170 request.setCacheEntry(entry); 171 // Mark the response as intermediate. 172 response.intermediate = true; 173 174 if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) { 175 // Post the intermediate response back to the user and have 176 // the delivery then forward the request along to the network. 177 mDelivery.postResponse( 178 request, 179 response, 180 new Runnable() { 181 @Override 182 public void run() { 183 try { 184 mNetworkQueue.put(request); 185 } catch (InterruptedException e) { 186 // Restore the interrupted status 187 Thread.currentThread().interrupt(); 188 } 189 } 190 }); 191 } else { 192 // request has been added to list of waiting requests 193 // to receive the network response from the first request once it returns. 194 mDelivery.postResponse(request, response); 195 } 196 } 197 } finally { 198 request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED); 199 } 200 } 201 202 private static class WaitingRequestManager implements Request.NetworkRequestCompleteListener { 203 204 /** 205 * Staging area for requests that already have a duplicate request in flight. 206 * 207 * <ul> 208 * <li>containsKey(cacheKey) indicates that there is a request in flight for the given 209 * cache key. 210 * <li>get(cacheKey) returns waiting requests for the given cache key. The in flight 211 * request is <em>not</em> contained in that list. Is null if no requests are staged. 212 * </ul> 213 */ 214 private final Map<String, List<Request<?>>> mWaitingRequests = new HashMap<>(); 215 216 private final CacheDispatcher mCacheDispatcher; 217 WaitingRequestManager(CacheDispatcher cacheDispatcher)218 WaitingRequestManager(CacheDispatcher cacheDispatcher) { 219 mCacheDispatcher = cacheDispatcher; 220 } 221 222 /** Request received a valid response that can be used by other waiting requests. */ 223 @Override onResponseReceived(Request<?> request, Response<?> response)224 public void onResponseReceived(Request<?> request, Response<?> response) { 225 if (response.cacheEntry == null || response.cacheEntry.isExpired()) { 226 onNoUsableResponseReceived(request); 227 return; 228 } 229 String cacheKey = request.getCacheKey(); 230 List<Request<?>> waitingRequests; 231 synchronized (this) { 232 waitingRequests = mWaitingRequests.remove(cacheKey); 233 } 234 if (waitingRequests != null) { 235 if (VolleyLog.DEBUG) { 236 VolleyLog.v( 237 "Releasing %d waiting requests for cacheKey=%s.", 238 waitingRequests.size(), cacheKey); 239 } 240 // Process all queued up requests. 241 for (Request<?> waiting : waitingRequests) { 242 mCacheDispatcher.mDelivery.postResponse(waiting, response); 243 } 244 } 245 } 246 247 /** No valid response received from network, release waiting requests. */ 248 @Override onNoUsableResponseReceived(Request<?> request)249 public synchronized void onNoUsableResponseReceived(Request<?> request) { 250 String cacheKey = request.getCacheKey(); 251 List<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey); 252 if (waitingRequests != null && !waitingRequests.isEmpty()) { 253 if (VolleyLog.DEBUG) { 254 VolleyLog.v( 255 "%d waiting requests for cacheKey=%s; resend to network", 256 waitingRequests.size(), cacheKey); 257 } 258 Request<?> nextInLine = waitingRequests.remove(0); 259 mWaitingRequests.put(cacheKey, waitingRequests); 260 nextInLine.setNetworkRequestCompleteListener(this); 261 try { 262 mCacheDispatcher.mNetworkQueue.put(nextInLine); 263 } catch (InterruptedException iex) { 264 VolleyLog.e("Couldn't add request to queue. %s", iex.toString()); 265 // Restore the interrupted status of the calling thread (i.e. NetworkDispatcher) 266 Thread.currentThread().interrupt(); 267 // Quit the current CacheDispatcher thread. 268 mCacheDispatcher.quit(); 269 } 270 } 271 } 272 273 /** 274 * For cacheable requests, if a request for the same cache key is already in flight, add it 275 * to a queue to wait for that in-flight request to finish. 276 * 277 * @return whether the request was queued. If false, we should continue issuing the request 278 * over the network. If true, we should put the request on hold to be processed when the 279 * in-flight request finishes. 280 */ maybeAddToWaitingRequests(Request<?> request)281 private synchronized boolean maybeAddToWaitingRequests(Request<?> request) { 282 String cacheKey = request.getCacheKey(); 283 // Insert request into stage if there's already a request with the same cache key 284 // in flight. 285 if (mWaitingRequests.containsKey(cacheKey)) { 286 // There is already a request in flight. Queue up. 287 List<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey); 288 if (stagedRequests == null) { 289 stagedRequests = new ArrayList<>(); 290 } 291 request.addMarker("waiting-for-response"); 292 stagedRequests.add(request); 293 mWaitingRequests.put(cacheKey, stagedRequests); 294 if (VolleyLog.DEBUG) { 295 VolleyLog.d("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); 296 } 297 return true; 298 } else { 299 // Insert 'null' queue for this cacheKey, indicating there is now a request in 300 // flight. 301 mWaitingRequests.put(cacheKey, null); 302 request.setNetworkRequestCompleteListener(this); 303 if (VolleyLog.DEBUG) { 304 VolleyLog.d("new request, sending to network %s", cacheKey); 305 } 306 return false; 307 } 308 } 309 } 310 } 311