1 // Copyright 2021 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package com.google.android.downloader; 16 17 import static com.google.common.base.Preconditions.checkArgument; 18 import static com.google.common.base.Preconditions.checkNotNull; 19 import static com.google.common.base.Preconditions.checkState; 20 import static com.google.common.base.Predicates.instanceOf; 21 import static com.google.common.base.Throwables.getCausalChain; 22 import static java.util.concurrent.TimeUnit.MILLISECONDS; 23 import static java.util.concurrent.TimeUnit.SECONDS; 24 25 import com.google.auto.value.AutoValue; 26 import com.google.common.annotations.VisibleForTesting; 27 import com.google.common.collect.ImmutableMap; 28 import com.google.common.collect.ImmutableSet; 29 import com.google.common.collect.Iterables; 30 import com.google.common.net.HttpHeaders; 31 import com.google.common.util.concurrent.ClosingFuture; 32 import com.google.common.util.concurrent.FluentFuture; 33 import com.google.common.util.concurrent.FutureCallback; 34 import com.google.common.util.concurrent.Futures; 35 import com.google.common.util.concurrent.ListenableFuture; 36 import com.google.common.util.concurrent.ListenableFutureTask; 37 import com.google.common.util.concurrent.MoreExecutors; 38 import com.google.errorprone.annotations.CheckReturnValue; 39 import com.google.errorprone.annotations.FormatMethod; 40 import com.google.errorprone.annotations.FormatString; 41 import com.google.errorprone.annotations.concurrent.GuardedBy; 42 import java.io.IOException; 43 import java.net.HttpURLConnection; 44 import java.net.URI; 45 import java.nio.channels.WritableByteChannel; 46 import java.text.ParseException; 47 import java.text.SimpleDateFormat; 48 import java.util.ArrayDeque; 49 import java.util.ArrayList; 50 import java.util.Date; 51 import java.util.HashMap; 52 import java.util.IdentityHashMap; 53 import java.util.List; 54 import java.util.Locale; 55 import java.util.Map; 56 import java.util.Queue; 57 import java.util.TimeZone; 58 import java.util.concurrent.CancellationException; 59 import java.util.concurrent.Executor; 60 import java.util.concurrent.TimeoutException; 61 import java.util.regex.Matcher; 62 import java.util.regex.Pattern; 63 import javax.annotation.Nullable; 64 65 /** 66 * In-process download system. Provides a light-weight mechanism to download resources over the 67 * network. Downloader includes the following features: 68 * 69 * <ul> 70 * <li>Configurable choice of network stack, with several defaults available out of the box. 71 * <li>Fully asynchronous behavior, allowing downloads to progress and complete without blocking 72 * (assuming the underlying network stack can avoid blocking). 73 * <li>Support for HTTP range requests, allowing partial downloads to resume mid-way through, and 74 * avoiding redownloading of fully-downloaded requests. 75 * <li>Detection of network interruptions, and a configurable way to retry requests that have 76 * failed after losing connectivity. 77 * </ul> 78 * 79 * <p>Note that because this library performs downloads in-process, it is subject to having 80 * downloads stall or abort when the app is suspended or killed. Download requests only live in 81 * memory, and thus are lost when the process ends. Library users should persist the set of 82 * downloads to be executed in persistent storage, such as a SQLite database, and run the downloads 83 * in the context of a persistent operation (either a foreground service or via Android's {@link 84 * android.app.job.JobScheduler} mechanism). 85 * 86 * <p>This is intended as a functional (but not drop-in) replacement for Android's {@link 87 * android.app.DownloadManager}, which has a number of issues: 88 * 89 * <ul> 90 * <li>It relies on the network stack built into Android, and thus cannot be patched to receive 91 * updates. For older versions of Android, that means the DownloadManager is vulnerable to 92 * issues in the network stack, such as b/18432707, which can result in MITM attacks. 93 * <li>When downloading to external storage on older versions of Android (via {@link 94 * android.app.DownloadManager.Request#setDestinationInExternalFilesDir} or {@link 95 * android.app.DownloadManager.Request#setDestinationInExternalPublicDir}), downloads may 96 * exceed the maximum size of the download directory (see b/22605830). 97 * <li>Downloads may pause and never resume again without external interference (b/18110151) 98 * <li>The DownloadManager may lose track of some files (b/18265497) 99 * </ul> 100 * 101 * <p>This library mitigates these issues by performing downloads in-process over the app-provided 102 * network stack, thus ensuring that an up-to-date network stack is used, and handing off storage 103 * management directly to the app without any additional restrictions. 104 */ 105 public class Downloader { 106 @VisibleForTesting static final int HTTP_RANGE_NOT_SATISFIABLE = 416; 107 @VisibleForTesting static final int HTTP_PARTIAL_CONTENT = 206; 108 109 private static final ImmutableSet<String> SCHEMES_REQUIRING_CONNECTIVITY = 110 ImmutableSet.of("http", "https"); 111 112 private static final Pattern CONTENT_RANGE_HEADER_PATTERN = 113 Pattern.compile("bytes (\\d+)-(\\d+)/(\\d+|\\*)"); 114 115 @GuardedBy("SIMPLE_DATE_FORMAT_LOCK") 116 private static final SimpleDateFormat RFC_1123_FORMATTER; 117 118 private static final Object SIMPLE_DATE_FORMAT_LOCK = new Object(); 119 120 static { 121 synchronized (SIMPLE_DATE_FORMAT_LOCK) { 122 RFC_1123_FORMATTER = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.US); 123 RFC_1123_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC")); 124 } 125 } 126 127 /** Builder for configuring and constructing an instance of the Downloader. */ 128 public static class Builder { 129 private final Map<String, UrlEngine> urlEngineMap = new HashMap<>(); 130 private Executor ioExecutor; 131 private DownloaderLogger logger; 132 private ConnectivityHandler connectivityHandler; 133 private int maxConcurrentDownloads = 3; 134 135 /** 136 * Creates a downloader builder. 137 * 138 * <p>Note that all parameters are required except for {@link #withMaxConcurrentDownloads}. 139 */ Builder()140 public Builder() {} 141 142 /** 143 * Specifies the executor to use internally for I/O. 144 * 145 * <p>I/O operations can block so don't use a direct executor or one that runs on the main 146 * thread. 147 */ withIOExecutor(Executor ioExecutor)148 public Builder withIOExecutor(Executor ioExecutor) { 149 this.ioExecutor = ioExecutor; 150 return this; 151 } 152 153 /** 154 * Sets the {@link ConnectivityHandler} to use in order to determine if connectivity is 155 * sufficient, and if not, how to handle it. 156 */ withConnectivityHandler(ConnectivityHandler connectivityHandler)157 public Builder withConnectivityHandler(ConnectivityHandler connectivityHandler) { 158 this.connectivityHandler = connectivityHandler; 159 return this; 160 } 161 162 /** 163 * Limits the number of downloads in flight at a time. If a download request arrives that would 164 * exceed this limit, it will be queued until one already in flight completes. Beware that a 165 * download that is waiting for connectivity requirements is still considered to be in flight, 166 * so it is possible to saturate the downloader with requests waiting on connectivity 167 * requirements if the number of concurrent downloads isn't set high enough. 168 * 169 * <p>Note that other factors might restrict download concurrency even further, for instance the 170 * number of threads on the I/O executor when using a blocking engine. 171 */ withMaxConcurrentDownloads(int maxConcurrentDownloads)172 public Builder withMaxConcurrentDownloads(int maxConcurrentDownloads) { 173 checkArgument(maxConcurrentDownloads > 0); 174 this.maxConcurrentDownloads = maxConcurrentDownloads; 175 return this; 176 } 177 178 /** 179 * Adds an {@link UrlEngine} to handle network requests for the given URL scheme. Note that the 180 * engine must support the provided scheme, and only one engine may ever be registered for a 181 * specific URL scheme. An {@link IllegalArgumentException} will be thrown if the engine doesn't 182 * support the scheme or if an engine is already registered for the scheme. 183 */ addUrlEngine(String scheme, UrlEngine urlEngine)184 public Builder addUrlEngine(String scheme, UrlEngine urlEngine) { 185 checkArgument( 186 urlEngine.supportedSchemes().contains(scheme), 187 "Provided UrlEngine must support URL scheme: %s", 188 scheme); 189 checkArgument( 190 !urlEngineMap.containsKey(scheme), 191 "Requested scheme already has a UrlEngine registered: %s", 192 scheme); 193 urlEngineMap.put(scheme, urlEngine); 194 return this; 195 } 196 197 /** 198 * Adds an {@link UrlEngine} to handle network requests for the given URL schemes. Note that the 199 * engine must support the provided schemes, and only one engine may ever be registered for a 200 * specific URL scheme. An error will be thrown if the engine doesn't support the schemes or if 201 * an engine is already registered for the schemes. 202 */ addUrlEngine(Iterable<String> schemes, UrlEngine urlEngine)203 public Builder addUrlEngine(Iterable<String> schemes, UrlEngine urlEngine) { 204 for (String scheme : schemes) { 205 addUrlEngine(scheme, urlEngine); 206 } 207 return this; 208 } 209 210 /** Sets the {@link DownloaderLogger} to use for logging in this downloader instance. */ withLogger(DownloaderLogger logger)211 public Builder withLogger(DownloaderLogger logger) { 212 this.logger = logger; 213 return this; 214 } 215 build()216 public Downloader build() { 217 return new Downloader(this); 218 } 219 } 220 221 /** 222 * Value class for capturing a state snapshot of the downloader, for use in the state callbacks 223 * that can be registered via {@link #registerStateChangeCallback}. 224 */ 225 @AutoValue 226 public abstract static class State { 227 /** 228 * Returns the current number of downloads currently in flight, i.e. the number of downloads 229 * that are concurrently executing, or attempting to make progress on their underlying network 230 * stack. Note that this number should be in the range of [0, maxConcurrentDownloads), as 231 * configured by {@link Builder#withMaxConcurrentDownloads}. 232 */ getNumDownloadsInFlight()233 public abstract int getNumDownloadsInFlight(); 234 235 /** 236 * Returns the number of downloads that have been requested via {@link #execute} but not yet 237 * started. They may not have been started due to internal asynchronous code, due to waiting on 238 * connectivity requirements, or due to the limit enforced by {@code maxConcurrentDownloads}. 239 */ getNumQueuedDownloads()240 public abstract int getNumQueuedDownloads(); 241 242 /** 243 * Returns the current number of downloads that are waiting for sufficient connectivity 244 * conditions to be met. These downloads will start running once the device network conditions 245 * change (e.g. connects to WiFi), if there is enough space as determined by {@code 246 * maxConcurrentDownloads}. 247 * 248 * <p>Note that this number should be in the range of [0, numQueuedDownloads], as a download 249 * pending connectivity is necessarily queued. However, a download may also be queued due to 250 * {@code maxConcurrentDownloads}. 251 */ getNumDownloadsPendingConnectivity()252 public abstract int getNumDownloadsPendingConnectivity(); 253 254 /** Creates an instance of the state object. Internal-only. */ 255 @VisibleForTesting create( int numDownloadsInFlight, int numQueuedDownloads, int numDownloadsPendingConnectivity)256 public static State create( 257 int numDownloadsInFlight, int numQueuedDownloads, int numDownloadsPendingConnectivity) { 258 return new AutoValue_Downloader_State( 259 numDownloadsInFlight, numQueuedDownloads, numDownloadsPendingConnectivity); 260 } 261 } 262 263 /** 264 * Functional callback interface for observing changes to Downloader {@link State}. Used with 265 * {@link #registerStateChangeCallback} and {@link #unregisterStateChangeCallback}. 266 * 267 * <p>Note that the callbacks could just use {@code java.util.function.Consumer} in contexts that 268 * support the Java 8 SDK, and {@code com.google.common.base.Receiver} in contexts that don't but 269 * have access to Guava APIs that aren't made public. 270 */ 271 public interface StateChangeCallback { onStateChange(State state)272 void onStateChange(State state); 273 } 274 275 private final ImmutableMap<String, UrlEngine> urlEngineMap; 276 private final Executor ioExecutor; 277 private final DownloaderLogger logger; 278 private final ConnectivityHandler connectivityHandler; 279 private final int maxConcurrentDownloads; 280 281 @GuardedBy("lock") 282 private final IdentityHashMap<StateChangeCallback, Executor> stateCallbackMap = 283 new IdentityHashMap<>(); 284 285 // TODO: Consider using a PriorityQueue here instead, so that queued downloads can 286 // retain the order in the queue as they get added/removed due to waiting on connectivity. 287 @GuardedBy("lock") 288 private final Queue<QueuedDownload> queuedDownloads = new ArrayDeque<>(); 289 290 @GuardedBy("lock") 291 private final List<FluentFuture<DownloadResult>> unresolvedFutures = new ArrayList<>(); 292 293 private final Object lock = new Object(); 294 295 @GuardedBy("lock") 296 private int numDownloadsInFlight = 0; 297 298 @GuardedBy("lock") 299 private int numDownloadsPendingConnectivity = 0; 300 Downloader(Builder builder)301 private Downloader(Builder builder) { 302 ImmutableMap<String, UrlEngine> urlEngineMap = ImmutableMap.copyOf(builder.urlEngineMap); 303 checkArgument(!urlEngineMap.isEmpty(), "Must have at least one UrlEngine"); 304 checkArgument(builder.ioExecutor != null, "Must set a callback executor"); 305 checkArgument(builder.logger != null, "Must set a logger"); 306 checkArgument(builder.connectivityHandler != null, "Must set a connectivity handler"); 307 308 this.urlEngineMap = urlEngineMap; 309 this.ioExecutor = builder.ioExecutor; 310 this.logger = builder.logger; 311 this.connectivityHandler = builder.connectivityHandler; 312 this.maxConcurrentDownloads = builder.maxConcurrentDownloads; 313 } 314 315 /** 316 * Creates a new {@link DownloadRequest.Builder} for the given {@link URI} and {@link 317 * DownloadDestination}. The builder may be used to further customize the request. To execute the 318 * request, pass the built request to {@link #execute}. 319 */ 320 @CheckReturnValue newRequestBuilder(URI uri, DownloadDestination destination)321 public DownloadRequest.Builder newRequestBuilder(URI uri, DownloadDestination destination) { 322 return DownloadRequest.newBuilder() 323 .setDestination(destination) 324 .setUri(uri) 325 .setDownloadConstraints(DownloadConstraints.NETWORK_CONNECTED); 326 } 327 328 /** 329 * Executes the provided request. The request will be handled by the underlying {@link UrlEngine} 330 * and the result is streamed to the {@link DownloadDestination} created for the request. The 331 * download result is provided asynchronously as a {@link FluentFuture} that resolves when the 332 * download is complete. 333 * 334 * <p>The download can be cancelled by calling {@link FluentFuture#cancel} on the future instance 335 * returned by this method. Cancellation is best-effort and does not guarantee that the download 336 * will stop immediately, as it is impossible to stop a thread that is in the middle of reading 337 * bytes off the network. 338 * 339 * <p>Note that this method is not idempotent! The downloader does not attempt to merge/de-dupe 340 * incoming requests, even if the same exact request is executed twice. The calling code needs to 341 * be careful to manage its downloads in such a way that duplicated downloads don't occur. 342 * 343 * <p>TODO: Document what types of exceptions can be set on the returned future, and how clients 344 * are expected to handle them. 345 */ 346 @CheckReturnValue execute(DownloadRequest request)347 public FluentFuture<DownloadResult> execute(DownloadRequest request) { 348 FluentFuture<DownloadResult> resultFuture; 349 350 synchronized (lock) { 351 ClosingFuture<DownloadResult> closingFuture = enqueueRequest(request); 352 closingFuture.statusFuture().addListener(() -> onDownloadComplete(request), ioExecutor); 353 354 resultFuture = closingFuture.finishToFuture(); 355 unresolvedFutures.add(resultFuture); 356 resultFuture.addListener( 357 () -> { 358 synchronized (lock) { 359 unresolvedFutures.remove(resultFuture); 360 } 361 }, 362 MoreExecutors.directExecutor()); 363 } 364 365 logger.logFine("New request enqueued, running queue: %s", request.uri()); 366 maybeRunQueuedDownloads(); 367 return resultFuture; 368 } 369 370 /** 371 * Cancels <strong>all</strong> ongoing downloads. This will result in all unresolved {@link 372 * FluentFuture} instances returned by {@link #execute} to be cancelled immediately and have their 373 * error callbacks invoked. 374 * 375 * <p>Because implementations of {@link FluentFuture} allow callbacks to be garbage collected 376 * after the future is resolved, calling {@code cancelAll} is an effective way to avoid having the 377 * Downloader leak memory after it is logically no longer needed, e.g. if it is only used from an 378 * Android {@link android.app.Activity}, and that Activity is destroyed. 379 * 380 * <p>However, in general the Downloader should be run from a process-level context, e.g. in an 381 * Android {@link android.app.Service}, so that the downloader doesn't implicitly hold on to 382 * UI-scoped objects. 383 */ cancelAll()384 public void cancelAll() { 385 List<FluentFuture<DownloadResult>> unresolvedFuturesCopy; 386 387 synchronized (lock) { 388 // Copy the set of unresolved futures to a local variable to avoid hitting 389 // ConcurrentModificationExceptions, which could happen since canceling the future may 390 // trigger the future callback in execute() that removes the future from the set of 391 // unresolved futures. 392 unresolvedFuturesCopy = new ArrayList<>(unresolvedFutures); 393 unresolvedFutures.clear(); 394 } 395 396 for (FluentFuture<DownloadResult> unresolvedFuture : unresolvedFuturesCopy) { 397 unresolvedFuture.cancel(true); 398 } 399 } 400 401 /** 402 * Registers an {@link StateChangeCallback} with this downloader instance, to be run when various 403 * state changes occur. The callback will be executed on the provided {@link Executor}. Use {@link 404 * #unregisterStateChangeCallback} to unregister a previously registered callback. The callback is 405 * invoked when the following state transitions occur: 406 * 407 * <ul> 408 * <li>A download was requested via a call to {@link #execute} 409 * <li>A download completed 410 * <li>A download started waiting for its connectivity constraints to be satisfied 411 * <li>A download stopped waiting for its connectivity constraints to be satisfied 412 * </ul> 413 * 414 * <p>A newly registered callback instance will only called for state changes that are triggered 415 * after the registration; there is no mechanism to replay previous state changes on the callback. 416 * 417 * <p>Registering the same callback multiple times has no effect, except that it will overwrite 418 * the {@link Executor} that was used in a previous registration call. 419 * 420 * <p>Invocation of the callbacks will be internally serialized to avoid concurrent invocations of 421 * the callback with possibly conflicting state. A new invocation of the callback will not start 422 * running until the previous one has completed. If multiple callbacks are being registered that 423 * must be synchronized with each other, then the caller must take care to coordinate this 424 * externally, such as locking in the callbacks or using an executor that guarantees 425 * serialization, such as {@link MoreExecutors#newSequentialExecutor}. 426 * 427 * <p>Warning: Do not use {@link MoreExecutors#directExecutor} or similar executor mechanisms, as 428 * doing so can easily lead to a deadlock, since the internal downloader lock is held while 429 * scheduling the callbacks on the provided executor. 430 */ registerStateChangeCallback(StateChangeCallback callback, Executor executor)431 public void registerStateChangeCallback(StateChangeCallback callback, Executor executor) { 432 synchronized (lock) { 433 stateCallbackMap.put(callback, MoreExecutors.newSequentialExecutor(executor)); 434 } 435 } 436 437 /** 438 * Unregisters a previously registered {@link StateChangeCallback} with this downloader instance. 439 * 440 * <p>Attempting to unregister a callback that was never registered is a no-op. 441 */ unregisterStateChangeCallback(StateChangeCallback callback)442 public void unregisterStateChangeCallback(StateChangeCallback callback) { 443 synchronized (lock) { 444 stateCallbackMap.remove(callback); 445 } 446 } 447 onDownloadComplete(DownloadRequest request)448 private void onDownloadComplete(DownloadRequest request) { 449 synchronized (lock) { 450 // The number of downloads should be well-balanced and this case should never 451 // trigger, so this is just a defensive check. 452 checkState( 453 numDownloadsInFlight >= 0, "Encountered < 0 downloads in flight, this shouldn't happen"); 454 } 455 456 logger.logFine("Download complete, running queued downloads: %s", request.uri()); 457 maybeRunQueuedDownloads(); 458 } 459 maybeRunQueuedDownloads()460 private void maybeRunQueuedDownloads() { 461 // Loop until we run out of download slots or out of queued downloads. It should be impossible 462 // for this to loop forever. Also, note that the synchronized block is inside the loop, since 463 // the outer loop conditions don't touch shared mutable state. 464 while (true) { 465 synchronized (lock) { 466 if (numDownloadsInFlight >= maxConcurrentDownloads) { 467 logger.logInfo("Exceeded max concurrent downloads, not running another queued request"); 468 return; 469 } 470 471 QueuedDownload queuedDownload = queuedDownloads.poll(); 472 if (queuedDownload == null) { 473 return; 474 } 475 476 DownloadRequest request = queuedDownload.request(); 477 ListenableFuture<Void> connectivityFuture = checkConnectivity(request); 478 if (connectivityFuture.isDone()) { 479 logger.logFine("Connectivity satisfied; running request. uri=%s", request.uri()); 480 numDownloadsInFlight++; 481 ListenableFuture<?> statusFuture = queuedDownload.resultFuture().statusFuture(); 482 statusFuture.addListener( 483 () -> { 484 synchronized (lock) { 485 numDownloadsInFlight--; 486 487 // One less download in flight, let the state change listeners know. 488 runStateCallbacks(); 489 } 490 491 // A download slot was just freed up, run queued downloads again. 492 logger.logFine( 493 "Queued download completed, running queued downloads: %s", request.uri()); 494 maybeRunQueuedDownloads(); 495 }, 496 ioExecutor); 497 498 // A new download is about to be in flight, let state callbacks know about the 499 // state change. 500 runStateCallbacks(); 501 queuedDownload.task().run(); 502 } else { 503 logger.logInfo("Waiting on connectivity for request: uri=%s", request.uri()); 504 handlePendingConnectivity(connectivityFuture, queuedDownload); 505 } 506 } 507 } 508 } 509 510 @GuardedBy("lock") handlePendingConnectivity( ListenableFuture<Void> connectivityFuture, QueuedDownload queuedDownload)511 private void handlePendingConnectivity( 512 ListenableFuture<Void> connectivityFuture, QueuedDownload queuedDownload) { 513 // Keep track of the number of requests waiting. 514 numDownloadsPendingConnectivity++; 515 connectivityFuture.addListener( 516 () -> { 517 synchronized (lock) { 518 numDownloadsPendingConnectivity--; 519 520 // Let the listeners know we are no longer waiting. 521 runStateCallbacks(); 522 } 523 }, 524 MoreExecutors.directExecutor()); 525 526 // Let the listeners know we're waiting. 527 runStateCallbacks(); 528 529 Futures.addCallback( 530 connectivityFuture, 531 new FutureCallback<Void>() { 532 @Override 533 public void onSuccess(Void result1) { 534 logger.logInfo("Connectivity changed, running queued requests"); 535 requeue(queuedDownload); 536 } 537 538 @Override 539 public void onFailure(Throwable t) { 540 if (t instanceof TimeoutException) { 541 logger.logInfo("Timed out waiting for connectivity change"); 542 requeue(queuedDownload); 543 } else if (t instanceof CancellationException) { 544 logger.logFine("Connectivity future cancelled, running queued downloads"); 545 maybeRunQueuedDownloads(); 546 } else { 547 logger.logError(t, "Error observing connectivity changes"); 548 cancelAll(); 549 } 550 } 551 }, 552 ioExecutor); 553 554 // Run state callbacks and cancel the connectivity future when the result task resolves. 555 // It doesn't matter if it succeeded or failed, either way it means we no longer need to wait 556 // for connectivity. 557 queuedDownload 558 .task() 559 .addListener( 560 () -> { 561 synchronized (lock) { 562 logger.logInfo("Queued task completed, cancelling connectivity check"); 563 runStateCallbacks(); 564 connectivityFuture.cancel(false); 565 } 566 }, 567 MoreExecutors.directExecutor()); 568 } 569 requeue(QueuedDownload queuedDownload)570 private void requeue(QueuedDownload queuedDownload) { 571 synchronized (lock) { 572 addToQueue(queuedDownload); 573 } 574 575 logger.logInfo( 576 "Requeuing download after connectivity change: %s", queuedDownload.request().uri()); 577 maybeRunQueuedDownloads(); 578 } 579 enqueueRequest(DownloadRequest request)580 private ClosingFuture<DownloadResult> enqueueRequest(DownloadRequest request) { 581 synchronized (lock) { 582 ListenableFutureTask<Void> task = ListenableFutureTask.create(() -> null); 583 // When the task runs (i.e. is taken off the queue and is explicitly run), all pre-flight 584 // checks should have been made, so at that point the request is send to the underlying 585 // network stack for execution. 586 ClosingFuture<DownloadResult> resultFuture = 587 ClosingFuture.from(task) 588 .transformAsync((closer, result) -> runRequest(request), ioExecutor); 589 addToQueue(QueuedDownload.create(request, task, resultFuture)); 590 return resultFuture; 591 } 592 } 593 594 @GuardedBy("lock") addToQueue(QueuedDownload queuedDownload)595 private void addToQueue(QueuedDownload queuedDownload) { 596 queuedDownloads.add(queuedDownload); 597 598 // Make sure that when the task completes, the queued download is definitely removed 599 // from the queue. This is necessary to be robust in the face of cancellation, as 600 // canceled tasks may not get removed from the queue otherwise. 601 queuedDownload 602 .task() 603 .addListener( 604 () -> { 605 synchronized (lock) { 606 if (queuedDownloads.remove(queuedDownload)) { 607 // If the queued download was actually removed, update the state callbacks 608 // to reflect the state change. 609 runStateCallbacks(); 610 } 611 } 612 }, 613 MoreExecutors.directExecutor()); 614 615 // Now that a new request is on the queue, run the state callbacks. 616 runStateCallbacks(); 617 } 618 runRequest(DownloadRequest request)619 private ClosingFuture<DownloadResult> runRequest(DownloadRequest request) throws IOException { 620 URI uri = request.uri(); 621 UrlEngine urlEngine = checkNotNull(urlEngineMap.get(uri.getScheme())); 622 UrlRequest.Builder urlRequestBuilder = urlEngine.createRequest(uri.toString()); 623 for (Map.Entry<String, String> entry : request.headers().entries()) { 624 urlRequestBuilder.addHeader(entry.getKey(), entry.getValue()); 625 } 626 627 long numExistingBytes = request.destination().numExistingBytes(); 628 if (numExistingBytes > 0) { 629 logger.logInfo( 630 "Existing bytes found. numExistingBytes=%d, uri=%s", numExistingBytes, request.uri()); 631 urlRequestBuilder.addHeader(HttpHeaders.RANGE, "bytes=" + numExistingBytes + "-"); 632 633 DownloadMetadata destinationMetadata = request.destination().readMetadata(); 634 String contentTag = destinationMetadata.getContentTag(); 635 long lastModifiedTimeSeconds = destinationMetadata.getLastModifiedTimeSeconds(); 636 if (!contentTag.isEmpty()) { 637 urlRequestBuilder.addHeader(HttpHeaders.IF_RANGE, contentTag); 638 } else if (lastModifiedTimeSeconds > 0) { 639 urlRequestBuilder.addHeader( 640 HttpHeaders.IF_RANGE, formatRfc1123Date(lastModifiedTimeSeconds)); 641 } else { 642 // TODO: This should probably just clear the destination and remove the Range 643 // header so there's no chance of data corruption. Leaving this as-is for now to 644 // keep supporting range requests for offline maps. 645 logger.logWarning( 646 "Sending range request without If-Range header, due to missing destination " 647 + "metadata. Data corruption is possible."); 648 } 649 } 650 651 ListenableFuture<UrlRequest> urlRequestFuture; 652 OAuthTokenProvider oAuthTokenProvider = request.oAuthTokenProvider(); 653 if (oAuthTokenProvider != null) { 654 urlRequestFuture = 655 Futures.transform( 656 oAuthTokenProvider.provideOAuthToken(uri), 657 authToken -> { 658 if (authToken != null) { 659 urlRequestBuilder.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + authToken); 660 } 661 return urlRequestBuilder.build(); 662 }, 663 ioExecutor); 664 } else { 665 urlRequestFuture = Futures.immediateFuture(urlRequestBuilder.build()); 666 } 667 668 return ClosingFuture.from(urlRequestFuture) 669 .transform((closer, urlRequest) -> checkNotNull(urlRequest).send(), ioExecutor) 670 .transformAsync( 671 (closer, responseFuture) -> completeRequest(request, checkNotNull(responseFuture)), 672 ioExecutor); 673 } 674 675 /** 676 * @param request the original request that triggered this call 677 * @param responseFuture the {@link UrlResponse}, provided asynchronously via a {@link 678 * ListenableFuture} 679 */ completeRequest( DownloadRequest request, ListenableFuture<UrlResponse> responseFuture)680 private ClosingFuture<DownloadResult> completeRequest( 681 DownloadRequest request, ListenableFuture<UrlResponse> responseFuture) { 682 return ClosingFuture.from(responseFuture) 683 .transformAsync( 684 (closer, urlResponse) -> { 685 checkNotNull(urlResponse); 686 // We want to close the response regardless of whether we succeed or fail. 687 closer.eventuallyClose(urlResponse, ioExecutor); 688 logger.logFine( 689 "Got URL response, starting to read response body. uri=%s", request.uri()); 690 DownloadDestination destination = request.destination(); 691 if (request.headers().containsKey(HttpHeaders.RANGE) 692 && checkNotNull(urlResponse).getResponseCode() != HTTP_PARTIAL_CONTENT) { 693 logger.logFine("Clearing %s as our range request wasn't honored", destination); 694 destination.clear(); 695 } 696 WritableByteChannel byteChannel = 697 destination.openByteChannel( 698 parseResponseStartOffset(urlResponse), parseResponseMetadata(urlResponse)); 699 closer.eventuallyClose(byteChannel, ioExecutor); 700 return ClosingFuture.from(checkNotNull(urlResponse).readResponseBody(byteChannel)); 701 }, 702 ioExecutor) 703 .catching( 704 RequestException.class, 705 (closer, requestException) -> { 706 if (checkNotNull(requestException).getErrorDetails().getHttpStatusCode() 707 == HTTP_RANGE_NOT_SATISFIABLE) { 708 // This is a bit of a special edge case. Encountering this error means the server 709 // rejected our HTTP range request because it was outside the range of available 710 // bytes. This may well mean that the request was malformed (e.g. data on disk was 711 // corrupted and the existing file size ended up larger than what exists on the 712 // server). But the more common cause for this error is that the entire file was 713 // in fact already downloaded, so the requested range would cover 0 bytes, which 714 // the server interprets as not satisfiable. To mitigate that case we simply return 715 // a success state here by indicating 0 bytes were downloaded. 716 717 // This isn't exactly ideal, as it means that a potential class of errors will 718 // go unnoticed. A better solution might find a way to distinguish between the 719 // common, file-already-downloaded case and the file corrupted case. This solution 720 // is put in place mainly to retain parity with the older downloader implementation. 721 return 0L; 722 } else { 723 throw new DownloadException(requestException); 724 } 725 }, 726 ioExecutor) 727 .transform( 728 (closer, bytesWritten) -> { 729 logger.logFine( 730 "Response body written. bytesWritten=%d, uri=%s", 731 checkNotNull(bytesWritten), request.uri()); 732 return DownloadResult.create(checkNotNull(bytesWritten)); 733 }, 734 ioExecutor) 735 .catchingAsync( 736 Exception.class, 737 (closer, exception) -> { 738 ClosingFuture<DownloadResult> result; 739 synchronized (lock) { 740 logger.logWarning( 741 exception, "Error reading download result. uri=%s", request.uri()); 742 RequestException requestException = getRequestException(exception); 743 if (requestException != null 744 && requestException.getErrorDetails().isRetryableAsIs()) { 745 // Retry the request by just re-enqueueing it. Note that we also need to 746 // call maybeRunQueuedRequest to keep the queue moving. Also, in this particular 747 // case we need to decrement the in-flight downloads count, as we are taking a 748 // previously in-flight download and putting it back in the queue without 749 // resolving the underlying result future. 750 numDownloadsInFlight--; 751 result = enqueueRequest(request); 752 } else { 753 throw new DownloadException(exception); 754 } 755 } 756 757 logger.logInfo("Running queued downloads after handling request exception"); 758 maybeRunQueuedDownloads(); 759 return result; 760 }, 761 ioExecutor); 762 } 763 764 @GuardedBy("lock") checkConnectivity(DownloadRequest request)765 private ListenableFuture<Void> checkConnectivity(DownloadRequest request) { 766 if (!SCHEMES_REQUIRING_CONNECTIVITY.contains(request.uri().getScheme())) { 767 return Futures.immediateVoidFuture(); 768 } 769 770 return connectivityHandler.checkConnectivity(request.downloadConstraints()); 771 } 772 773 @GuardedBy("lock") runStateCallbacks()774 private void runStateCallbacks() { 775 State state = 776 State.create(numDownloadsInFlight, queuedDownloads.size(), numDownloadsPendingConnectivity); 777 for (Map.Entry<StateChangeCallback, Executor> callbackEntry : stateCallbackMap.entrySet()) { 778 callbackEntry.getValue().execute(() -> callbackEntry.getKey().onStateChange(state)); 779 } 780 } 781 formatRfc1123Date(long unixTimeSeconds)782 private static String formatRfc1123Date(long unixTimeSeconds) { 783 synchronized (SIMPLE_DATE_FORMAT_LOCK) { 784 return RFC_1123_FORMATTER.format(new Date(SECONDS.toMillis(unixTimeSeconds))); 785 } 786 } 787 parseResponseStartOffset(UrlResponse response)788 private static long parseResponseStartOffset(UrlResponse response) throws DownloadException { 789 if (response.getResponseCode() != HttpURLConnection.HTTP_PARTIAL) { 790 return 0; 791 } 792 793 List<String> contentRangeHeaders = response.getResponseHeaders().get(HttpHeaders.CONTENT_RANGE); 794 795 checkDownloadState( 796 contentRangeHeaders != null && !contentRangeHeaders.isEmpty(), 797 "Host returned 206/PARTIAL response code but didn't provide a " 798 + "'Content-Range' response header"); 799 String contentRangeHeader = checkNotNull(contentRangeHeaders).get(0); 800 Matcher matcher = CONTENT_RANGE_HEADER_PATTERN.matcher(contentRangeHeader); 801 checkDownloadState( 802 matcher.matches() && matcher.groupCount() > 0, 803 "Content-Range response header didn't match expected pattern. " + "Was '%s', expected '%s'", 804 contentRangeHeader, 805 CONTENT_RANGE_HEADER_PATTERN.pattern()); 806 return Long.parseLong(checkNotNull(matcher.group(1))); 807 } 808 parseResponseMetadata(UrlResponse response)809 private static DownloadMetadata parseResponseMetadata(UrlResponse response) 810 throws DownloadException { 811 String contentTag = parseResponseContentTag(response); 812 long lastModifiedTimeSeconds = parseResponseModifiedTime(response); 813 return DownloadMetadata.create(contentTag, lastModifiedTimeSeconds); 814 } 815 parseResponseModifiedTime(UrlResponse response)816 private static long parseResponseModifiedTime(UrlResponse response) throws DownloadException { 817 List<String> lastModifiedHeaders = response.getResponseHeaders().get(HttpHeaders.LAST_MODIFIED); 818 if (lastModifiedHeaders == null || lastModifiedHeaders.isEmpty()) { 819 return 0L; 820 } 821 822 String lastModifiedHeader = lastModifiedHeaders.get(0); 823 Date date; 824 825 try { 826 synchronized (SIMPLE_DATE_FORMAT_LOCK) { 827 date = RFC_1123_FORMATTER.parse(lastModifiedHeader); 828 } 829 if (date == null) { 830 throw new DownloadException("Invalid Last-Modified header: " + lastModifiedHeader); 831 } 832 } catch (ParseException e) { 833 throw new DownloadException("Invalid Last-Modified header: " + lastModifiedHeader, e); 834 } 835 836 return MILLISECONDS.toSeconds(date.getTime()); 837 } 838 parseResponseContentTag(UrlResponse response)839 private static String parseResponseContentTag(UrlResponse response) { 840 List<String> contentTagHeaders = response.getResponseHeaders().get(HttpHeaders.ETAG); 841 if (contentTagHeaders == null || contentTagHeaders.isEmpty()) { 842 return ""; 843 } 844 845 return contentTagHeaders.get(0); 846 } 847 848 @AutoValue 849 abstract static class QueuedDownload { request()850 abstract DownloadRequest request(); 851 task()852 abstract ListenableFutureTask<?> task(); 853 resultFuture()854 abstract ClosingFuture<DownloadResult> resultFuture(); 855 create( DownloadRequest request, ListenableFutureTask<?> task, ClosingFuture<DownloadResult> resultFuture)856 static QueuedDownload create( 857 DownloadRequest request, 858 ListenableFutureTask<?> task, 859 ClosingFuture<DownloadResult> resultFuture) { 860 return new AutoValue_Downloader_QueuedDownload(request, task, resultFuture); 861 } 862 } 863 864 @VisibleForTesting 865 @Nullable getRequestException(@ullable Throwable throwable)866 static RequestException getRequestException(@Nullable Throwable throwable) { 867 if (throwable == null) { 868 return null; 869 } else { 870 return (RequestException) 871 Iterables.find( 872 getCausalChain(throwable), 873 instanceOf(RequestException.class), 874 /* defaultValue= */ null); 875 } 876 } 877 878 @FormatMethod checkDownloadState( boolean state, @FormatString String message, Object... args)879 private static void checkDownloadState( 880 boolean state, @FormatString String message, Object... args) throws DownloadException { 881 if (!state) { 882 throw new DownloadException(String.format(message, args)); 883 } 884 } 885 } 886