• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 package com.google.android.downloader;
16 
17 import static com.google.common.base.Preconditions.checkArgument;
18 import static com.google.common.base.Preconditions.checkNotNull;
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.google.common.base.Predicates.instanceOf;
21 import static com.google.common.base.Throwables.getCausalChain;
22 import static java.util.concurrent.TimeUnit.MILLISECONDS;
23 import static java.util.concurrent.TimeUnit.SECONDS;
24 
25 import com.google.auto.value.AutoValue;
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.collect.ImmutableSet;
29 import com.google.common.collect.Iterables;
30 import com.google.common.net.HttpHeaders;
31 import com.google.common.util.concurrent.ClosingFuture;
32 import com.google.common.util.concurrent.FluentFuture;
33 import com.google.common.util.concurrent.FutureCallback;
34 import com.google.common.util.concurrent.Futures;
35 import com.google.common.util.concurrent.ListenableFuture;
36 import com.google.common.util.concurrent.ListenableFutureTask;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.errorprone.annotations.CheckReturnValue;
39 import com.google.errorprone.annotations.FormatMethod;
40 import com.google.errorprone.annotations.FormatString;
41 import com.google.errorprone.annotations.concurrent.GuardedBy;
42 import java.io.IOException;
43 import java.net.HttpURLConnection;
44 import java.net.URI;
45 import java.nio.channels.WritableByteChannel;
46 import java.text.ParseException;
47 import java.text.SimpleDateFormat;
48 import java.util.ArrayDeque;
49 import java.util.ArrayList;
50 import java.util.Date;
51 import java.util.HashMap;
52 import java.util.IdentityHashMap;
53 import java.util.List;
54 import java.util.Locale;
55 import java.util.Map;
56 import java.util.Queue;
57 import java.util.TimeZone;
58 import java.util.concurrent.CancellationException;
59 import java.util.concurrent.Executor;
60 import java.util.concurrent.TimeoutException;
61 import java.util.regex.Matcher;
62 import java.util.regex.Pattern;
63 import javax.annotation.Nullable;
64 
65 /**
66  * In-process download system. Provides a light-weight mechanism to download resources over the
67  * network. Downloader includes the following features:
68  *
69  * <ul>
70  *   <li>Configurable choice of network stack, with several defaults available out of the box.
71  *   <li>Fully asynchronous behavior, allowing downloads to progress and complete without blocking
72  *       (assuming the underlying network stack can avoid blocking).
73  *   <li>Support for HTTP range requests, allowing partial downloads to resume mid-way through, and
74  *       avoiding redownloading of fully-downloaded requests.
75  *   <li>Detection of network interruptions, and a configurable way to retry requests that have
76  *       failed after losing connectivity.
77  * </ul>
78  *
79  * <p>Note that because this library performs downloads in-process, it is subject to having
80  * downloads stall or abort when the app is suspended or killed. Download requests only live in
81  * memory, and thus are lost when the process ends. Library users should persist the set of
82  * downloads to be executed in persistent storage, such as a SQLite database, and run the downloads
83  * in the context of a persistent operation (either a foreground service or via Android's {@link
84  * android.app.job.JobScheduler} mechanism).
85  *
86  * <p>This is intended as a functional (but not drop-in) replacement for Android's {@link
87  * android.app.DownloadManager}, which has a number of issues:
88  *
89  * <ul>
90  *   <li>It relies on the network stack built into Android, and thus cannot be patched to receive
91  *       updates. For older versions of Android, that means the DownloadManager is vulnerable to
92  *       issues in the network stack, such as b/18432707, which can result in MITM attacks.
93  *   <li>When downloading to external storage on older versions of Android (via {@link
94  *       android.app.DownloadManager.Request#setDestinationInExternalFilesDir} or {@link
95  *       android.app.DownloadManager.Request#setDestinationInExternalPublicDir}), downloads may
96  *       exceed the maximum size of the download directory (see b/22605830).
97  *   <li>Downloads may pause and never resume again without external interference (b/18110151)
98  *   <li>The DownloadManager may lose track of some files (b/18265497)
99  * </ul>
100  *
101  * <p>This library mitigates these issues by performing downloads in-process over the app-provided
102  * network stack, thus ensuring that an up-to-date network stack is used, and handing off storage
103  * management directly to the app without any additional restrictions.
104  */
105 public class Downloader {
106   @VisibleForTesting static final int HTTP_RANGE_NOT_SATISFIABLE = 416;
107   @VisibleForTesting static final int HTTP_PARTIAL_CONTENT = 206;
108 
109   private static final ImmutableSet<String> SCHEMES_REQUIRING_CONNECTIVITY =
110       ImmutableSet.of("http", "https");
111 
112   private static final Pattern CONTENT_RANGE_HEADER_PATTERN =
113       Pattern.compile("bytes (\\d+)-(\\d+)/(\\d+|\\*)");
114 
115   @GuardedBy("SIMPLE_DATE_FORMAT_LOCK")
116   private static final SimpleDateFormat RFC_1123_FORMATTER;
117 
118   private static final Object SIMPLE_DATE_FORMAT_LOCK = new Object();
119 
120   static {
121     synchronized (SIMPLE_DATE_FORMAT_LOCK) {
122       RFC_1123_FORMATTER = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.US);
123       RFC_1123_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC"));
124     }
125   }
126 
127   /** Builder for configuring and constructing an instance of the Downloader. */
128   public static class Builder {
129     private final Map<String, UrlEngine> urlEngineMap = new HashMap<>();
130     private Executor ioExecutor;
131     private DownloaderLogger logger;
132     private ConnectivityHandler connectivityHandler;
133     private int maxConcurrentDownloads = 3;
134 
135     /**
136      * Creates a downloader builder.
137      *
138      * <p>Note that all parameters are required except for {@link #withMaxConcurrentDownloads}.
139      */
Builder()140     public Builder() {}
141 
142     /**
143      * Specifies the executor to use internally for I/O.
144      *
145      * <p>I/O operations can block so don't use a direct executor or one that runs on the main
146      * thread.
147      */
withIOExecutor(Executor ioExecutor)148     public Builder withIOExecutor(Executor ioExecutor) {
149       this.ioExecutor = ioExecutor;
150       return this;
151     }
152 
153     /**
154      * Sets the {@link ConnectivityHandler} to use in order to determine if connectivity is
155      * sufficient, and if not, how to handle it.
156      */
withConnectivityHandler(ConnectivityHandler connectivityHandler)157     public Builder withConnectivityHandler(ConnectivityHandler connectivityHandler) {
158       this.connectivityHandler = connectivityHandler;
159       return this;
160     }
161 
162     /**
163      * Limits the number of downloads in flight at a time. If a download request arrives that would
164      * exceed this limit, it will be queued until one already in flight completes. Beware that a
165      * download that is waiting for connectivity requirements is still considered to be in flight,
166      * so it is possible to saturate the downloader with requests waiting on connectivity
167      * requirements if the number of concurrent downloads isn't set high enough.
168      *
169      * <p>Note that other factors might restrict download concurrency even further, for instance the
170      * number of threads on the I/O executor when using a blocking engine.
171      */
withMaxConcurrentDownloads(int maxConcurrentDownloads)172     public Builder withMaxConcurrentDownloads(int maxConcurrentDownloads) {
173       checkArgument(maxConcurrentDownloads > 0);
174       this.maxConcurrentDownloads = maxConcurrentDownloads;
175       return this;
176     }
177 
178     /**
179      * Adds an {@link UrlEngine} to handle network requests for the given URL scheme. Note that the
180      * engine must support the provided scheme, and only one engine may ever be registered for a
181      * specific URL scheme. An {@link IllegalArgumentException} will be thrown if the engine doesn't
182      * support the scheme or if an engine is already registered for the scheme.
183      */
addUrlEngine(String scheme, UrlEngine urlEngine)184     public Builder addUrlEngine(String scheme, UrlEngine urlEngine) {
185       checkArgument(
186           urlEngine.supportedSchemes().contains(scheme),
187           "Provided UrlEngine must support URL scheme: %s",
188           scheme);
189       checkArgument(
190           !urlEngineMap.containsKey(scheme),
191           "Requested scheme already has a UrlEngine registered: %s",
192           scheme);
193       urlEngineMap.put(scheme, urlEngine);
194       return this;
195     }
196 
197     /**
198      * Adds an {@link UrlEngine} to handle network requests for the given URL schemes. Note that the
199      * engine must support the provided schemes, and only one engine may ever be registered for a
200      * specific URL scheme. An error will be thrown if the engine doesn't support the schemes or if
201      * an engine is already registered for the schemes.
202      */
addUrlEngine(Iterable<String> schemes, UrlEngine urlEngine)203     public Builder addUrlEngine(Iterable<String> schemes, UrlEngine urlEngine) {
204       for (String scheme : schemes) {
205         addUrlEngine(scheme, urlEngine);
206       }
207       return this;
208     }
209 
210     /** Sets the {@link DownloaderLogger} to use for logging in this downloader instance. */
withLogger(DownloaderLogger logger)211     public Builder withLogger(DownloaderLogger logger) {
212       this.logger = logger;
213       return this;
214     }
215 
build()216     public Downloader build() {
217       return new Downloader(this);
218     }
219   }
220 
221   /**
222    * Value class for capturing a state snapshot of the downloader, for use in the state callbacks
223    * that can be registered via {@link #registerStateChangeCallback}.
224    */
225   @AutoValue
226   public abstract static class State {
227     /**
228      * Returns the current number of downloads currently in flight, i.e. the number of downloads
229      * that are concurrently executing, or attempting to make progress on their underlying network
230      * stack. Note that this number should be in the range of [0, maxConcurrentDownloads), as
231      * configured by {@link Builder#withMaxConcurrentDownloads}.
232      */
getNumDownloadsInFlight()233     public abstract int getNumDownloadsInFlight();
234 
235     /**
236      * Returns the number of downloads that have been requested via {@link #execute} but not yet
237      * started. They may not have been started due to internal asynchronous code, due to waiting on
238      * connectivity requirements, or due to the limit enforced by {@code maxConcurrentDownloads}.
239      */
getNumQueuedDownloads()240     public abstract int getNumQueuedDownloads();
241 
242     /**
243      * Returns the current number of downloads that are waiting for sufficient connectivity
244      * conditions to be met. These downloads will start running once the device network conditions
245      * change (e.g. connects to WiFi), if there is enough space as determined by {@code
246      * maxConcurrentDownloads}.
247      *
248      * <p>Note that this number should be in the range of [0, numQueuedDownloads], as a download
249      * pending connectivity is necessarily queued. However, a download may also be queued due to
250      * {@code maxConcurrentDownloads}.
251      */
getNumDownloadsPendingConnectivity()252     public abstract int getNumDownloadsPendingConnectivity();
253 
254     /** Creates an instance of the state object. Internal-only. */
255     @VisibleForTesting
create( int numDownloadsInFlight, int numQueuedDownloads, int numDownloadsPendingConnectivity)256     public static State create(
257         int numDownloadsInFlight, int numQueuedDownloads, int numDownloadsPendingConnectivity) {
258       return new AutoValue_Downloader_State(
259           numDownloadsInFlight, numQueuedDownloads, numDownloadsPendingConnectivity);
260     }
261   }
262 
263   /**
264    * Functional callback interface for observing changes to Downloader {@link State}. Used with
265    * {@link #registerStateChangeCallback} and {@link #unregisterStateChangeCallback}.
266    *
267    * <p>Note that the callbacks could just use {@code java.util.function.Consumer} in contexts that
268    * support the Java 8 SDK, and {@code com.google.common.base.Receiver} in contexts that don't but
269    * have access to Guava APIs that aren't made public.
270    */
271   public interface StateChangeCallback {
onStateChange(State state)272     void onStateChange(State state);
273   }
274 
275   private final ImmutableMap<String, UrlEngine> urlEngineMap;
276   private final Executor ioExecutor;
277   private final DownloaderLogger logger;
278   private final ConnectivityHandler connectivityHandler;
279   private final int maxConcurrentDownloads;
280 
281   @GuardedBy("lock")
282   private final IdentityHashMap<StateChangeCallback, Executor> stateCallbackMap =
283       new IdentityHashMap<>();
284 
285   // TODO: Consider using a PriorityQueue here instead, so that queued downloads can
286   // retain the order in the queue as they get added/removed due to waiting on connectivity.
287   @GuardedBy("lock")
288   private final Queue<QueuedDownload> queuedDownloads = new ArrayDeque<>();
289 
290   @GuardedBy("lock")
291   private final List<FluentFuture<DownloadResult>> unresolvedFutures = new ArrayList<>();
292 
293   private final Object lock = new Object();
294 
295   @GuardedBy("lock")
296   private int numDownloadsInFlight = 0;
297 
298   @GuardedBy("lock")
299   private int numDownloadsPendingConnectivity = 0;
300 
Downloader(Builder builder)301   private Downloader(Builder builder) {
302     ImmutableMap<String, UrlEngine> urlEngineMap = ImmutableMap.copyOf(builder.urlEngineMap);
303     checkArgument(!urlEngineMap.isEmpty(), "Must have at least one UrlEngine");
304     checkArgument(builder.ioExecutor != null, "Must set a callback executor");
305     checkArgument(builder.logger != null, "Must set a logger");
306     checkArgument(builder.connectivityHandler != null, "Must set a connectivity handler");
307 
308     this.urlEngineMap = urlEngineMap;
309     this.ioExecutor = builder.ioExecutor;
310     this.logger = builder.logger;
311     this.connectivityHandler = builder.connectivityHandler;
312     this.maxConcurrentDownloads = builder.maxConcurrentDownloads;
313   }
314 
315   /**
316    * Creates a new {@link DownloadRequest.Builder} for the given {@link URI} and {@link
317    * DownloadDestination}. The builder may be used to further customize the request. To execute the
318    * request, pass the built request to {@link #execute}.
319    */
320   @CheckReturnValue
newRequestBuilder(URI uri, DownloadDestination destination)321   public DownloadRequest.Builder newRequestBuilder(URI uri, DownloadDestination destination) {
322     return DownloadRequest.newBuilder()
323         .setDestination(destination)
324         .setUri(uri)
325         .setDownloadConstraints(DownloadConstraints.NETWORK_CONNECTED);
326   }
327 
328   /**
329    * Executes the provided request. The request will be handled by the underlying {@link UrlEngine}
330    * and the result is streamed to the {@link DownloadDestination} created for the request. The
331    * download result is provided asynchronously as a {@link FluentFuture} that resolves when the
332    * download is complete.
333    *
334    * <p>The download can be cancelled by calling {@link FluentFuture#cancel} on the future instance
335    * returned by this method. Cancellation is best-effort and does not guarantee that the download
336    * will stop immediately, as it is impossible to stop a thread that is in the middle of reading
337    * bytes off the network.
338    *
339    * <p>Note that this method is not idempotent! The downloader does not attempt to merge/de-dupe
340    * incoming requests, even if the same exact request is executed twice. The calling code needs to
341    * be careful to manage its downloads in such a way that duplicated downloads don't occur.
342    *
343    * <p>TODO: Document what types of exceptions can be set on the returned future, and how clients
344    * are expected to handle them.
345    */
346   @CheckReturnValue
execute(DownloadRequest request)347   public FluentFuture<DownloadResult> execute(DownloadRequest request) {
348     FluentFuture<DownloadResult> resultFuture;
349 
350     synchronized (lock) {
351       ClosingFuture<DownloadResult> closingFuture = enqueueRequest(request);
352       closingFuture.statusFuture().addListener(() -> onDownloadComplete(request), ioExecutor);
353 
354       resultFuture = closingFuture.finishToFuture();
355       unresolvedFutures.add(resultFuture);
356       resultFuture.addListener(
357           () -> {
358             synchronized (lock) {
359               unresolvedFutures.remove(resultFuture);
360             }
361           },
362           MoreExecutors.directExecutor());
363     }
364 
365     logger.logFine("New request enqueued, running queue: %s", request.uri());
366     maybeRunQueuedDownloads();
367     return resultFuture;
368   }
369 
370   /**
371    * Cancels <strong>all</strong> ongoing downloads. This will result in all unresolved {@link
372    * FluentFuture} instances returned by {@link #execute} to be cancelled immediately and have their
373    * error callbacks invoked.
374    *
375    * <p>Because implementations of {@link FluentFuture} allow callbacks to be garbage collected
376    * after the future is resolved, calling {@code cancelAll} is an effective way to avoid having the
377    * Downloader leak memory after it is logically no longer needed, e.g. if it is only used from an
378    * Android {@link android.app.Activity}, and that Activity is destroyed.
379    *
380    * <p>However, in general the Downloader should be run from a process-level context, e.g. in an
381    * Android {@link android.app.Service}, so that the downloader doesn't implicitly hold on to
382    * UI-scoped objects.
383    */
cancelAll()384   public void cancelAll() {
385     List<FluentFuture<DownloadResult>> unresolvedFuturesCopy;
386 
387     synchronized (lock) {
388       // Copy the set of unresolved futures to a local variable to avoid hitting
389       // ConcurrentModificationExceptions, which could happen since canceling the future may
390       // trigger the future callback in execute() that removes the future from the set of
391       // unresolved futures.
392       unresolvedFuturesCopy = new ArrayList<>(unresolvedFutures);
393       unresolvedFutures.clear();
394     }
395 
396     for (FluentFuture<DownloadResult> unresolvedFuture : unresolvedFuturesCopy) {
397       unresolvedFuture.cancel(true);
398     }
399   }
400 
401   /**
402    * Registers an {@link StateChangeCallback} with this downloader instance, to be run when various
403    * state changes occur. The callback will be executed on the provided {@link Executor}. Use {@link
404    * #unregisterStateChangeCallback} to unregister a previously registered callback. The callback is
405    * invoked when the following state transitions occur:
406    *
407    * <ul>
408    *   <li>A download was requested via a call to {@link #execute}
409    *   <li>A download completed
410    *   <li>A download started waiting for its connectivity constraints to be satisfied
411    *   <li>A download stopped waiting for its connectivity constraints to be satisfied
412    * </ul>
413    *
414    * <p>A newly registered callback instance will only called for state changes that are triggered
415    * after the registration; there is no mechanism to replay previous state changes on the callback.
416    *
417    * <p>Registering the same callback multiple times has no effect, except that it will overwrite
418    * the {@link Executor} that was used in a previous registration call.
419    *
420    * <p>Invocation of the callbacks will be internally serialized to avoid concurrent invocations of
421    * the callback with possibly conflicting state. A new invocation of the callback will not start
422    * running until the previous one has completed. If multiple callbacks are being registered that
423    * must be synchronized with each other, then the caller must take care to coordinate this
424    * externally, such as locking in the callbacks or using an executor that guarantees
425    * serialization, such as {@link MoreExecutors#newSequentialExecutor}.
426    *
427    * <p>Warning: Do not use {@link MoreExecutors#directExecutor} or similar executor mechanisms, as
428    * doing so can easily lead to a deadlock, since the internal downloader lock is held while
429    * scheduling the callbacks on the provided executor.
430    */
registerStateChangeCallback(StateChangeCallback callback, Executor executor)431   public void registerStateChangeCallback(StateChangeCallback callback, Executor executor) {
432     synchronized (lock) {
433       stateCallbackMap.put(callback, MoreExecutors.newSequentialExecutor(executor));
434     }
435   }
436 
437   /**
438    * Unregisters a previously registered {@link StateChangeCallback} with this downloader instance.
439    *
440    * <p>Attempting to unregister a callback that was never registered is a no-op.
441    */
unregisterStateChangeCallback(StateChangeCallback callback)442   public void unregisterStateChangeCallback(StateChangeCallback callback) {
443     synchronized (lock) {
444       stateCallbackMap.remove(callback);
445     }
446   }
447 
onDownloadComplete(DownloadRequest request)448   private void onDownloadComplete(DownloadRequest request) {
449     synchronized (lock) {
450       // The number of downloads should be well-balanced and this case should never
451       // trigger, so this is just a defensive check.
452       checkState(
453           numDownloadsInFlight >= 0, "Encountered < 0 downloads in flight, this shouldn't happen");
454     }
455 
456     logger.logFine("Download complete, running queued downloads: %s", request.uri());
457     maybeRunQueuedDownloads();
458   }
459 
maybeRunQueuedDownloads()460   private void maybeRunQueuedDownloads() {
461     // Loop until we run out of download slots or out of queued downloads. It should be impossible
462     // for this to loop forever. Also, note that the synchronized block is inside the loop, since
463     // the outer loop conditions don't touch shared mutable state.
464     while (true) {
465       synchronized (lock) {
466         if (numDownloadsInFlight >= maxConcurrentDownloads) {
467           logger.logInfo("Exceeded max concurrent downloads, not running another queued request");
468           return;
469         }
470 
471         QueuedDownload queuedDownload = queuedDownloads.poll();
472         if (queuedDownload == null) {
473           return;
474         }
475 
476         DownloadRequest request = queuedDownload.request();
477         ListenableFuture<Void> connectivityFuture = checkConnectivity(request);
478         if (connectivityFuture.isDone()) {
479           logger.logFine("Connectivity satisfied; running request. uri=%s", request.uri());
480           numDownloadsInFlight++;
481           ListenableFuture<?> statusFuture = queuedDownload.resultFuture().statusFuture();
482           statusFuture.addListener(
483               () -> {
484                 synchronized (lock) {
485                   numDownloadsInFlight--;
486 
487                   // One less download in flight, let the state change listeners know.
488                   runStateCallbacks();
489                 }
490 
491                 // A download slot was just freed up, run queued downloads again.
492                 logger.logFine(
493                     "Queued download completed, running queued downloads: %s", request.uri());
494                 maybeRunQueuedDownloads();
495               },
496               ioExecutor);
497 
498           // A new download is about to be in flight, let state callbacks know about the
499           // state change.
500           runStateCallbacks();
501           queuedDownload.task().run();
502         } else {
503           logger.logInfo("Waiting on connectivity for request: uri=%s", request.uri());
504           handlePendingConnectivity(connectivityFuture, queuedDownload);
505         }
506       }
507     }
508   }
509 
510   @GuardedBy("lock")
handlePendingConnectivity( ListenableFuture<Void> connectivityFuture, QueuedDownload queuedDownload)511   private void handlePendingConnectivity(
512       ListenableFuture<Void> connectivityFuture, QueuedDownload queuedDownload) {
513     // Keep track of the number of requests waiting.
514     numDownloadsPendingConnectivity++;
515     connectivityFuture.addListener(
516         () -> {
517           synchronized (lock) {
518             numDownloadsPendingConnectivity--;
519 
520             // Let the listeners know we are no longer waiting.
521             runStateCallbacks();
522           }
523         },
524         MoreExecutors.directExecutor());
525 
526     // Let the listeners know we're waiting.
527     runStateCallbacks();
528 
529     Futures.addCallback(
530         connectivityFuture,
531         new FutureCallback<Void>() {
532           @Override
533           public void onSuccess(Void result1) {
534             logger.logInfo("Connectivity changed, running queued requests");
535             requeue(queuedDownload);
536           }
537 
538           @Override
539           public void onFailure(Throwable t) {
540             if (t instanceof TimeoutException) {
541               logger.logInfo("Timed out waiting for connectivity change");
542               requeue(queuedDownload);
543             } else if (t instanceof CancellationException) {
544               logger.logFine("Connectivity future cancelled, running queued downloads");
545               maybeRunQueuedDownloads();
546             } else {
547               logger.logError(t, "Error observing connectivity changes");
548               cancelAll();
549             }
550           }
551         },
552         ioExecutor);
553 
554     // Run state callbacks and cancel the connectivity future when the result task resolves.
555     // It doesn't matter if it succeeded or failed, either way it means we no longer need to wait
556     // for connectivity.
557     queuedDownload
558         .task()
559         .addListener(
560             () -> {
561               synchronized (lock) {
562                 logger.logInfo("Queued task completed, cancelling connectivity check");
563                 runStateCallbacks();
564                 connectivityFuture.cancel(false);
565               }
566             },
567             MoreExecutors.directExecutor());
568   }
569 
requeue(QueuedDownload queuedDownload)570   private void requeue(QueuedDownload queuedDownload) {
571     synchronized (lock) {
572       addToQueue(queuedDownload);
573     }
574 
575     logger.logInfo(
576         "Requeuing download after connectivity change: %s", queuedDownload.request().uri());
577     maybeRunQueuedDownloads();
578   }
579 
enqueueRequest(DownloadRequest request)580   private ClosingFuture<DownloadResult> enqueueRequest(DownloadRequest request) {
581     synchronized (lock) {
582       ListenableFutureTask<Void> task = ListenableFutureTask.create(() -> null);
583       // When the task runs (i.e. is taken off the queue and is explicitly run), all pre-flight
584       // checks should have been made, so at that point the request is send to the underlying
585       // network stack for execution.
586       ClosingFuture<DownloadResult> resultFuture =
587           ClosingFuture.from(task)
588               .transformAsync((closer, result) -> runRequest(request), ioExecutor);
589       addToQueue(QueuedDownload.create(request, task, resultFuture));
590       return resultFuture;
591     }
592   }
593 
594   @GuardedBy("lock")
addToQueue(QueuedDownload queuedDownload)595   private void addToQueue(QueuedDownload queuedDownload) {
596     queuedDownloads.add(queuedDownload);
597 
598     // Make sure that when the task completes, the queued download is definitely removed
599     // from the queue. This is necessary to be robust in the face of cancellation, as
600     // canceled tasks may not get removed from the queue otherwise.
601     queuedDownload
602         .task()
603         .addListener(
604             () -> {
605               synchronized (lock) {
606                 if (queuedDownloads.remove(queuedDownload)) {
607                   // If the queued download was actually removed, update the state callbacks
608                   // to reflect the state change.
609                   runStateCallbacks();
610                 }
611               }
612             },
613             MoreExecutors.directExecutor());
614 
615     // Now that a new request is on the queue, run the state callbacks.
616     runStateCallbacks();
617   }
618 
runRequest(DownloadRequest request)619   private ClosingFuture<DownloadResult> runRequest(DownloadRequest request) throws IOException {
620     URI uri = request.uri();
621     UrlEngine urlEngine = checkNotNull(urlEngineMap.get(uri.getScheme()));
622     UrlRequest.Builder urlRequestBuilder = urlEngine.createRequest(uri.toString());
623     for (Map.Entry<String, String> entry : request.headers().entries()) {
624       urlRequestBuilder.addHeader(entry.getKey(), entry.getValue());
625     }
626 
627     long numExistingBytes = request.destination().numExistingBytes();
628     if (numExistingBytes > 0) {
629       logger.logInfo(
630           "Existing bytes found. numExistingBytes=%d, uri=%s", numExistingBytes, request.uri());
631       urlRequestBuilder.addHeader(HttpHeaders.RANGE, "bytes=" + numExistingBytes + "-");
632 
633       DownloadMetadata destinationMetadata = request.destination().readMetadata();
634       String contentTag = destinationMetadata.getContentTag();
635       long lastModifiedTimeSeconds = destinationMetadata.getLastModifiedTimeSeconds();
636       if (!contentTag.isEmpty()) {
637         urlRequestBuilder.addHeader(HttpHeaders.IF_RANGE, contentTag);
638       } else if (lastModifiedTimeSeconds > 0) {
639         urlRequestBuilder.addHeader(
640             HttpHeaders.IF_RANGE, formatRfc1123Date(lastModifiedTimeSeconds));
641       } else {
642         // TODO: This should probably just clear the destination and remove the Range
643         // header so there's no chance of data corruption. Leaving this as-is for now to
644         // keep supporting range requests for offline maps.
645         logger.logWarning(
646             "Sending range request without If-Range header, due to missing destination "
647                 + "metadata. Data corruption is possible.");
648       }
649     }
650 
651     ListenableFuture<UrlRequest> urlRequestFuture;
652     OAuthTokenProvider oAuthTokenProvider = request.oAuthTokenProvider();
653     if (oAuthTokenProvider != null) {
654       urlRequestFuture =
655           Futures.transform(
656               oAuthTokenProvider.provideOAuthToken(uri),
657               authToken -> {
658                 if (authToken != null) {
659                   urlRequestBuilder.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + authToken);
660                 }
661                 return urlRequestBuilder.build();
662               },
663               ioExecutor);
664     } else {
665       urlRequestFuture = Futures.immediateFuture(urlRequestBuilder.build());
666     }
667 
668     return ClosingFuture.from(urlRequestFuture)
669         .transform((closer, urlRequest) -> checkNotNull(urlRequest).send(), ioExecutor)
670         .transformAsync(
671             (closer, responseFuture) -> completeRequest(request, checkNotNull(responseFuture)),
672             ioExecutor);
673   }
674 
675   /**
676    * @param request the original request that triggered this call
677    * @param responseFuture the {@link UrlResponse}, provided asynchronously via a {@link
678    *     ListenableFuture}
679    */
completeRequest( DownloadRequest request, ListenableFuture<UrlResponse> responseFuture)680   private ClosingFuture<DownloadResult> completeRequest(
681       DownloadRequest request, ListenableFuture<UrlResponse> responseFuture) {
682     return ClosingFuture.from(responseFuture)
683         .transformAsync(
684             (closer, urlResponse) -> {
685               checkNotNull(urlResponse);
686               // We want to close the response regardless of whether we succeed or fail.
687               closer.eventuallyClose(urlResponse, ioExecutor);
688               logger.logFine(
689                   "Got URL response, starting to read response body. uri=%s", request.uri());
690               DownloadDestination destination = request.destination();
691               if (request.headers().containsKey(HttpHeaders.RANGE)
692                   && checkNotNull(urlResponse).getResponseCode() != HTTP_PARTIAL_CONTENT) {
693                 logger.logFine("Clearing %s as our range request wasn't honored", destination);
694                 destination.clear();
695               }
696               WritableByteChannel byteChannel =
697                   destination.openByteChannel(
698                       parseResponseStartOffset(urlResponse), parseResponseMetadata(urlResponse));
699               closer.eventuallyClose(byteChannel, ioExecutor);
700               return ClosingFuture.from(checkNotNull(urlResponse).readResponseBody(byteChannel));
701             },
702             ioExecutor)
703         .catching(
704             RequestException.class,
705             (closer, requestException) -> {
706               if (checkNotNull(requestException).getErrorDetails().getHttpStatusCode()
707                   == HTTP_RANGE_NOT_SATISFIABLE) {
708                 // This is a bit of a special edge case. Encountering this error means the server
709                 // rejected our HTTP range request because it was outside the range of available
710                 // bytes. This may well mean that the request was malformed (e.g. data on disk was
711                 // corrupted and the existing file size ended up larger than what exists on the
712                 // server). But the more common cause for this error is that the entire file was
713                 // in fact already downloaded, so the requested range would cover 0 bytes, which
714                 // the server interprets as not satisfiable. To mitigate that case we simply return
715                 // a success state here by indicating 0 bytes were downloaded.
716 
717                 // This isn't exactly ideal, as it means that a potential class of errors will
718                 // go unnoticed. A better solution might find a way to distinguish between the
719                 // common, file-already-downloaded case and the file corrupted case. This solution
720                 // is put in place mainly to retain parity with the older downloader implementation.
721                 return 0L;
722               } else {
723                 throw new DownloadException(requestException);
724               }
725             },
726             ioExecutor)
727         .transform(
728             (closer, bytesWritten) -> {
729               logger.logFine(
730                   "Response body written. bytesWritten=%d, uri=%s",
731                   checkNotNull(bytesWritten), request.uri());
732               return DownloadResult.create(checkNotNull(bytesWritten));
733             },
734             ioExecutor)
735         .catchingAsync(
736             Exception.class,
737             (closer, exception) -> {
738               ClosingFuture<DownloadResult> result;
739               synchronized (lock) {
740                 logger.logWarning(
741                     exception, "Error reading download result. uri=%s", request.uri());
742                 RequestException requestException = getRequestException(exception);
743                 if (requestException != null
744                     && requestException.getErrorDetails().isRetryableAsIs()) {
745                   // Retry the request by just re-enqueueing it. Note that we also need to
746                   // call maybeRunQueuedRequest to keep the queue moving. Also, in this particular
747                   // case we need to decrement the in-flight downloads count, as we are taking a
748                   // previously in-flight download and putting it back in the queue without
749                   // resolving the underlying result future.
750                   numDownloadsInFlight--;
751                   result = enqueueRequest(request);
752                 } else {
753                   throw new DownloadException(exception);
754                 }
755               }
756 
757               logger.logInfo("Running queued downloads after handling request exception");
758               maybeRunQueuedDownloads();
759               return result;
760             },
761             ioExecutor);
762   }
763 
764   @GuardedBy("lock")
checkConnectivity(DownloadRequest request)765   private ListenableFuture<Void> checkConnectivity(DownloadRequest request) {
766     if (!SCHEMES_REQUIRING_CONNECTIVITY.contains(request.uri().getScheme())) {
767       return Futures.immediateVoidFuture();
768     }
769 
770     return connectivityHandler.checkConnectivity(request.downloadConstraints());
771   }
772 
773   @GuardedBy("lock")
runStateCallbacks()774   private void runStateCallbacks() {
775     State state =
776         State.create(numDownloadsInFlight, queuedDownloads.size(), numDownloadsPendingConnectivity);
777     for (Map.Entry<StateChangeCallback, Executor> callbackEntry : stateCallbackMap.entrySet()) {
778       callbackEntry.getValue().execute(() -> callbackEntry.getKey().onStateChange(state));
779     }
780   }
781 
formatRfc1123Date(long unixTimeSeconds)782   private static String formatRfc1123Date(long unixTimeSeconds) {
783     synchronized (SIMPLE_DATE_FORMAT_LOCK) {
784       return RFC_1123_FORMATTER.format(new Date(SECONDS.toMillis(unixTimeSeconds)));
785     }
786   }
787 
parseResponseStartOffset(UrlResponse response)788   private static long parseResponseStartOffset(UrlResponse response) throws DownloadException {
789     if (response.getResponseCode() != HttpURLConnection.HTTP_PARTIAL) {
790       return 0;
791     }
792 
793     List<String> contentRangeHeaders = response.getResponseHeaders().get(HttpHeaders.CONTENT_RANGE);
794 
795     checkDownloadState(
796         contentRangeHeaders != null && !contentRangeHeaders.isEmpty(),
797         "Host returned 206/PARTIAL response code but didn't provide a "
798             + "'Content-Range' response header");
799     String contentRangeHeader = checkNotNull(contentRangeHeaders).get(0);
800     Matcher matcher = CONTENT_RANGE_HEADER_PATTERN.matcher(contentRangeHeader);
801     checkDownloadState(
802         matcher.matches() && matcher.groupCount() > 0,
803         "Content-Range response header didn't match expected pattern. " + "Was '%s', expected '%s'",
804         contentRangeHeader,
805         CONTENT_RANGE_HEADER_PATTERN.pattern());
806     return Long.parseLong(checkNotNull(matcher.group(1)));
807   }
808 
parseResponseMetadata(UrlResponse response)809   private static DownloadMetadata parseResponseMetadata(UrlResponse response)
810       throws DownloadException {
811     String contentTag = parseResponseContentTag(response);
812     long lastModifiedTimeSeconds = parseResponseModifiedTime(response);
813     return DownloadMetadata.create(contentTag, lastModifiedTimeSeconds);
814   }
815 
parseResponseModifiedTime(UrlResponse response)816   private static long parseResponseModifiedTime(UrlResponse response) throws DownloadException {
817     List<String> lastModifiedHeaders = response.getResponseHeaders().get(HttpHeaders.LAST_MODIFIED);
818     if (lastModifiedHeaders == null || lastModifiedHeaders.isEmpty()) {
819       return 0L;
820     }
821 
822     String lastModifiedHeader = lastModifiedHeaders.get(0);
823     Date date;
824 
825     try {
826       synchronized (SIMPLE_DATE_FORMAT_LOCK) {
827         date = RFC_1123_FORMATTER.parse(lastModifiedHeader);
828       }
829       if (date == null) {
830         throw new DownloadException("Invalid Last-Modified header: " + lastModifiedHeader);
831       }
832     } catch (ParseException e) {
833       throw new DownloadException("Invalid Last-Modified header: " + lastModifiedHeader, e);
834     }
835 
836     return MILLISECONDS.toSeconds(date.getTime());
837   }
838 
parseResponseContentTag(UrlResponse response)839   private static String parseResponseContentTag(UrlResponse response) {
840     List<String> contentTagHeaders = response.getResponseHeaders().get(HttpHeaders.ETAG);
841     if (contentTagHeaders == null || contentTagHeaders.isEmpty()) {
842       return "";
843     }
844 
845     return contentTagHeaders.get(0);
846   }
847 
848   @AutoValue
849   abstract static class QueuedDownload {
request()850     abstract DownloadRequest request();
851 
task()852     abstract ListenableFutureTask<?> task();
853 
resultFuture()854     abstract ClosingFuture<DownloadResult> resultFuture();
855 
create( DownloadRequest request, ListenableFutureTask<?> task, ClosingFuture<DownloadResult> resultFuture)856     static QueuedDownload create(
857         DownloadRequest request,
858         ListenableFutureTask<?> task,
859         ClosingFuture<DownloadResult> resultFuture) {
860       return new AutoValue_Downloader_QueuedDownload(request, task, resultFuture);
861     }
862   }
863 
864   @VisibleForTesting
865   @Nullable
getRequestException(@ullable Throwable throwable)866   static RequestException getRequestException(@Nullable Throwable throwable) {
867     if (throwable == null) {
868       return null;
869     } else {
870       return (RequestException)
871           Iterables.find(
872               getCausalChain(throwable),
873               instanceOf(RequestException.class),
874               /* defaultValue= */ null);
875     }
876   }
877 
878   @FormatMethod
checkDownloadState( boolean state, @FormatString String message, Object... args)879   private static void checkDownloadState(
880       boolean state, @FormatString String message, Object... args) throws DownloadException {
881     if (!state) {
882       throw new DownloadException(String.format(message, args));
883     }
884   }
885 }
886