• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.google.android.libraries.mobiledatadownload.lite;
17 
18 import android.net.Uri;
19 import androidx.annotation.VisibleForTesting;
20 import com.google.android.libraries.mobiledatadownload.TimeSource;
21 import com.google.android.libraries.mobiledatadownload.file.spi.Monitor;
22 import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import com.google.errorprone.annotations.concurrent.GuardedBy;
25 import java.util.HashMap;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.atomic.AtomicLong;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.ThreadSafe;
30 
31 /** A Download Progress Monitor to support {@link DownloadListener}. */
32 @ThreadSafe
33 public class DownloadProgressMonitor implements Monitor, SingleFileDownloadProgressMonitor {
34 
35   private static final String TAG = "DownloadProgressMonitor";
36 
37   private final TimeSource timeSource;
38   private final Executor sequentialControlExecutor;
39 
DownloadProgressMonitor(TimeSource timeSource, Executor controlExecutor)40   private DownloadProgressMonitor(TimeSource timeSource, Executor controlExecutor) {
41     this.timeSource = timeSource;
42 
43     // We want onProgress to be executed in order otherwise clients will observe out of order
44     // updates (bigger current size update appears before smaller current size update).
45     // We use Sequential Executor to ensure the onProgress will be processed sequentially.
46     this.sequentialControlExecutor = MoreExecutors.newSequentialExecutor(controlExecutor);
47   }
48 
49   /** Constructor overload with {@link TimeSource}. */
50   // NOTE: this is necessary for use by other MDD components.
create(TimeSource timeSource, Executor controlExecutor)51   public static DownloadProgressMonitor create(TimeSource timeSource, Executor controlExecutor) {
52     return new DownloadProgressMonitor(timeSource, controlExecutor);
53   }
54 
55   // We will only broadcast on progress notification at most once in this time frame.
56   // Currently MobStore Monitor notify every 8KB of downloaded bytes. This may be too chatty on
57   // fast network.
58   // 1000 was chosen arbitrarily.
59   @VisibleForTesting static final long BUFFERED_TIME_MS = 1000;
60 
61   @GuardedBy("DownloadProgressMonitor.class")
62   private final HashMap<Uri, DownloadedBytesCounter> uriToDownloadedBytesCounter = new HashMap<>();
63 
64   @Override
65   @Nullable
monitorRead(Uri uri)66   public Monitor.InputMonitor monitorRead(Uri uri) {
67     return null;
68   }
69 
70   @Override
71   @Nullable
monitorWrite(Uri uri)72   public Monitor.OutputMonitor monitorWrite(Uri uri) {
73     synchronized (DownloadProgressMonitor.class) {
74       if (uriToDownloadedBytesCounter.get(uri) == null) {
75         // All monitors for a shared FileStorage will be invoked for all file accesses through this
76         // shared FileStorage. So this monitor can receive non-MDD-Lite Uri.
77         return null;
78       }
79       return uriToDownloadedBytesCounter.get(uri);
80     }
81   }
82 
83   @Override
84   @Nullable
monitorAppend(Uri uri)85   public Monitor.OutputMonitor monitorAppend(Uri uri) {
86     return monitorWrite(uri);
87   }
88 
pausedForConnectivity()89   public void pausedForConnectivity() {
90     synchronized (DownloadProgressMonitor.class) {
91       for (DownloadedBytesCounter downloadedBytesCounter : uriToDownloadedBytesCounter.values()) {
92         downloadedBytesCounter.pausedForConnectivity();
93       }
94     }
95   }
96 
97   @Override
addDownloadListener(Uri uri, DownloadListener downloadListener)98   public void addDownloadListener(Uri uri, DownloadListener downloadListener) {
99     synchronized (DownloadProgressMonitor.class) {
100       if (!uriToDownloadedBytesCounter.containsKey(uri)) {
101         uriToDownloadedBytesCounter.put(uri, new DownloadedBytesCounter(uri, downloadListener));
102       }
103     }
104   }
105 
106   @Override
removeDownloadListener(Uri uri)107   public void removeDownloadListener(Uri uri) {
108     synchronized (DownloadProgressMonitor.class) {
109       uriToDownloadedBytesCounter.remove(uri);
110     }
111   }
112 
113   // A counter for bytes downloaded.
114   private final class DownloadedBytesCounter implements Monitor.OutputMonitor {
115     private final Uri uri;
116     private final DownloadListener downloadListener;
117 
118     private final AtomicLong byteCounter = new AtomicLong();
119 
120     // Last timestamp that we broadcast on progress.
121     private long lastBroadcastOnProgressTimestampMs;
122 
DownloadedBytesCounter(Uri uri, DownloadListener downloadListener)123     DownloadedBytesCounter(Uri uri, DownloadListener downloadListener) {
124       this.uri = uri;
125       this.downloadListener = downloadListener;
126       lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis();
127     }
128 
129     @Override
bytesWritten(byte[] b, int off, int len)130     public void bytesWritten(byte[] b, int off, int len) {
131       notifyProgress(len);
132     }
133 
notifyProgress(long len)134     private void notifyProgress(long len) {
135       // Only broadcast progress update every BUFFERED_TIME_MS.
136       // It will be fast (no locking) when there is no need to broadcast progress.
137       // When there is a need to broadcast progress, we need to obtain the lock due to 2 reasons:
138       // 1- Concurrent access to uriToDownloadedBytesCounter.
139       // 2- Prevent out of order progress update.
140       if (timeSource.currentTimeMillis() - lastBroadcastOnProgressTimestampMs < BUFFERED_TIME_MS) {
141         byteCounter.getAndAdd(len);
142         LogUtil.v(
143             "%s: Received data for uri = %s, len = %d, Counter = %d",
144             TAG, uri, len, byteCounter.get());
145       } else {
146         synchronized (DownloadProgressMonitor.class) {
147           // Reset timestamp.
148           lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis();
149 
150           byteCounter.getAndAdd(len);
151           LogUtil.v(
152               "%s: Received data for uri = %s, len = %d, Counter = %d",
153               TAG, uri, len, byteCounter.get());
154 
155           if (uriToDownloadedBytesCounter.containsKey(uri)) {
156             sequentialControlExecutor.execute(() -> downloadListener.onProgress(byteCounter.get()));
157           }
158         }
159       }
160     }
161 
pausedForConnectivity()162     public void pausedForConnectivity() {
163       downloadListener.onPausedForConnectivity();
164     }
165   }
166 }
167