/* * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.android.libraries.mobiledatadownload.lite; import android.net.Uri; import androidx.annotation.VisibleForTesting; import com.google.android.libraries.mobiledatadownload.TimeSource; import com.google.android.libraries.mobiledatadownload.file.spi.Monitor; import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil; import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.HashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; /** A Download Progress Monitor to support {@link DownloadListener}. */ @ThreadSafe public class DownloadProgressMonitor implements Monitor, SingleFileDownloadProgressMonitor { private static final String TAG = "DownloadProgressMonitor"; private final TimeSource timeSource; private final Executor sequentialControlExecutor; private DownloadProgressMonitor(TimeSource timeSource, Executor controlExecutor) { this.timeSource = timeSource; // We want onProgress to be executed in order otherwise clients will observe out of order // updates (bigger current size update appears before smaller current size update). // We use Sequential Executor to ensure the onProgress will be processed sequentially. this.sequentialControlExecutor = MoreExecutors.newSequentialExecutor(controlExecutor); } /** Constructor overload with {@link TimeSource}. */ // NOTE: this is necessary for use by other MDD components. public static DownloadProgressMonitor create(TimeSource timeSource, Executor controlExecutor) { return new DownloadProgressMonitor(timeSource, controlExecutor); } // We will only broadcast on progress notification at most once in this time frame. // Currently MobStore Monitor notify every 8KB of downloaded bytes. This may be too chatty on // fast network. // 1000 was chosen arbitrarily. @VisibleForTesting static final long BUFFERED_TIME_MS = 1000; @GuardedBy("DownloadProgressMonitor.class") private final HashMap uriToDownloadedBytesCounter = new HashMap<>(); @Override @Nullable public Monitor.InputMonitor monitorRead(Uri uri) { return null; } @Override @Nullable public Monitor.OutputMonitor monitorWrite(Uri uri) { synchronized (DownloadProgressMonitor.class) { if (uriToDownloadedBytesCounter.get(uri) == null) { // All monitors for a shared FileStorage will be invoked for all file accesses through this // shared FileStorage. So this monitor can receive non-MDD-Lite Uri. return null; } return uriToDownloadedBytesCounter.get(uri); } } @Override @Nullable public Monitor.OutputMonitor monitorAppend(Uri uri) { return monitorWrite(uri); } public void pausedForConnectivity() { synchronized (DownloadProgressMonitor.class) { for (DownloadedBytesCounter downloadedBytesCounter : uriToDownloadedBytesCounter.values()) { downloadedBytesCounter.pausedForConnectivity(); } } } @Override public void addDownloadListener(Uri uri, DownloadListener downloadListener) { synchronized (DownloadProgressMonitor.class) { if (!uriToDownloadedBytesCounter.containsKey(uri)) { uriToDownloadedBytesCounter.put(uri, new DownloadedBytesCounter(uri, downloadListener)); } } } @Override public void removeDownloadListener(Uri uri) { synchronized (DownloadProgressMonitor.class) { uriToDownloadedBytesCounter.remove(uri); } } // A counter for bytes downloaded. private final class DownloadedBytesCounter implements Monitor.OutputMonitor { private final Uri uri; private final DownloadListener downloadListener; private final AtomicLong byteCounter = new AtomicLong(); // Last timestamp that we broadcast on progress. private long lastBroadcastOnProgressTimestampMs; DownloadedBytesCounter(Uri uri, DownloadListener downloadListener) { this.uri = uri; this.downloadListener = downloadListener; lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis(); } @Override public void bytesWritten(byte[] b, int off, int len) { notifyProgress(len); } private void notifyProgress(long len) { // Only broadcast progress update every BUFFERED_TIME_MS. // It will be fast (no locking) when there is no need to broadcast progress. // When there is a need to broadcast progress, we need to obtain the lock due to 2 reasons: // 1- Concurrent access to uriToDownloadedBytesCounter. // 2- Prevent out of order progress update. if (timeSource.currentTimeMillis() - lastBroadcastOnProgressTimestampMs < BUFFERED_TIME_MS) { byteCounter.getAndAdd(len); LogUtil.v( "%s: Received data for uri = %s, len = %d, Counter = %d", TAG, uri, len, byteCounter.get()); } else { synchronized (DownloadProgressMonitor.class) { // Reset timestamp. lastBroadcastOnProgressTimestampMs = timeSource.currentTimeMillis(); byteCounter.getAndAdd(len); LogUtil.v( "%s: Received data for uri = %s, len = %d, Counter = %d", TAG, uri, len, byteCounter.get()); if (uriToDownloadedBytesCounter.containsKey(uri)) { sequentialControlExecutor.execute(() -> downloadListener.onProgress(byteCounter.get())); } } } } public void pausedForConnectivity() { downloadListener.onPausedForConnectivity(); } } }