1 /* 2 * Copyright 2022 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.google.android.libraries.mobiledatadownload.lite; 17 18 import android.net.Uri; 19 import androidx.annotation.VisibleForTesting; 20 import com.google.android.libraries.mobiledatadownload.TimeSource; 21 import com.google.android.libraries.mobiledatadownload.file.spi.Monitor; 22 import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil; 23 import com.google.common.util.concurrent.MoreExecutors; 24 import com.google.errorprone.annotations.concurrent.GuardedBy; 25 import java.util.HashMap; 26 import java.util.concurrent.Executor; 27 import java.util.concurrent.atomic.AtomicLong; 28 import javax.annotation.Nullable; 29 import javax.annotation.concurrent.ThreadSafe; 30 31 /** A Download Progress Monitor to support {@link DownloadListener}. */ 32 @ThreadSafe 33 public class DownloadProgressMonitor implements Monitor, SingleFileDownloadProgressMonitor { 34 35 private static final String TAG = "DownloadProgressMonitor"; 36 37 private final TimeSource timeSource; 38 private final Executor sequentialControlExecutor; 39 DownloadProgressMonitor(TimeSource timeSource, Executor controlExecutor)40 private DownloadProgressMonitor(TimeSource timeSource, Executor controlExecutor) { 41 this.timeSource = timeSource; 42 43 // We want onProgress to be executed in order otherwise clients will observe out of order 44 // updates (bigger current size update appears before smaller current size update). 45 // We use Sequential Executor to ensure the onProgress will be processed sequentially. 46 this.sequentialControlExecutor = MoreExecutors.newSequentialExecutor(controlExecutor); 47 } 48 49 /** Constructor overload with {@link TimeSource}. */ 50 // NOTE: this is necessary for use by other MDD components. create(TimeSource timeSource, Executor controlExecutor)51 public static DownloadProgressMonitor create(TimeSource timeSource, Executor controlExecutor) { 52 return new DownloadProgressMonitor(timeSource, controlExecutor); 53 } 54 55 // We will only broadcast on progress notification at most once in this time frame. 56 // Currently MobStore Monitor notify every 8KB of downloaded bytes. This may be too chatty on 57 // fast network. 58 // 1000 was chosen arbitrarily. 59 @VisibleForTesting static final long BUFFERED_TIME_MS = 1000; 60 61 @GuardedBy("DownloadProgressMonitor.class") 62 private final HashMap<Uri, DownloadedBytesCounter> uriToDownloadedBytesCounter = new HashMap<>(); 63 64 @Override 65 @Nullable monitorRead(Uri uri)66 public Monitor.InputMonitor monitorRead(Uri uri) { 67 return null; 68 } 69 70 @Override 71 @Nullable monitorWrite(Uri uri)72 public Monitor.OutputMonitor monitorWrite(Uri uri) { 73 synchronized (DownloadProgressMonitor.class) { 74 if (uriToDownloadedBytesCounter.get(uri) == null) { 75 // All monitors for a shared FileStorage will be invoked for all file accesses through this 76 // shared FileStorage. So this monitor can receive non-MDD-Lite Uri. 77 return null; 78 } 79 return uriToDownloadedBytesCounter.get(uri); 80 } 81 } 82 83 @Override 84 @Nullable monitorAppend(Uri uri)85 public Monitor.OutputMonitor monitorAppend(Uri uri) { 86 return monitorWrite(uri); 87 } 88 pausedForConnectivity()89 public void pausedForConnectivity() { 90 synchronized (DownloadProgressMonitor.class) { 91 for (DownloadedBytesCounter downloadedBytesCounter : uriToDownloadedBytesCounter.values()) { 92 downloadedBytesCounter.pausedForConnectivity(); 93 } 94 } 95 } 96 97 @Override addDownloadListener(Uri uri, DownloadListener downloadListener)98 public void addDownloadListener(Uri uri, DownloadListener downloadListener) { 99 synchronized (DownloadProgressMonitor.class) { 100 if (!uriToDownloadedBytesCounter.containsKey(uri)) { 101 uriToDownloadedBytesCounter.put(uri, new DownloadedBytesCounter(uri, downloadListener)); 102 } 103 } 104 } 105 106 @Override removeDownloadListener(Uri uri)107 public void removeDownloadListener(Uri uri) { 108 synchronized (DownloadProgressMonitor.class) { 109 uriToDownloadedBytesCounter.remove(uri); 110 } 111 } 112 113 // A counter for bytes downloaded. 114 private final class DownloadedBytesCounter implements Monitor.OutputMonitor { 115 private final Uri uri; 116 private final DownloadListener downloadListener; 117 118 private final AtomicLong byteCounter = new AtomicLong(); 119 120 // Last timestamp that we broadcast on progress. 121 private long lastBroadcastOnProgressTimestampMs; 122 DownloadedBytesCounter(Uri uri, DownloadListener downloadListener)123 DownloadedBytesCounter(Uri uri, DownloadListener downloadListener) { 124 this.uri = uri; 125 this.downloadListener = downloadListener; 126 lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis(); 127 } 128 129 @Override bytesWritten(byte[] b, int off, int len)130 public void bytesWritten(byte[] b, int off, int len) { 131 notifyProgress(len); 132 } 133 notifyProgress(long len)134 private void notifyProgress(long len) { 135 // Only broadcast progress update every BUFFERED_TIME_MS. 136 // It will be fast (no locking) when there is no need to broadcast progress. 137 // When there is a need to broadcast progress, we need to obtain the lock due to 2 reasons: 138 // 1- Concurrent access to uriToDownloadedBytesCounter. 139 // 2- Prevent out of order progress update. 140 if (timeSource.currentTimeMillis() - lastBroadcastOnProgressTimestampMs < BUFFERED_TIME_MS) { 141 byteCounter.getAndAdd(len); 142 LogUtil.v( 143 "%s: Received data for uri = %s, len = %d, Counter = %d", 144 TAG, uri, len, byteCounter.get()); 145 } else { 146 synchronized (DownloadProgressMonitor.class) { 147 // Reset timestamp. 148 lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis(); 149 150 byteCounter.getAndAdd(len); 151 LogUtil.v( 152 "%s: Received data for uri = %s, len = %d, Counter = %d", 153 TAG, uri, len, byteCounter.get()); 154 155 if (uriToDownloadedBytesCounter.containsKey(uri)) { 156 sequentialControlExecutor.execute(() -> downloadListener.onProgress(byteCounter.get())); 157 } 158 } 159 } 160 } 161 pausedForConnectivity()162 public void pausedForConnectivity() { 163 downloadListener.onPausedForConnectivity(); 164 } 165 } 166 } 167