1 /* 2 * Copyright 2022 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.google.android.libraries.mobiledatadownload.downloader.offroad; 17 18 import static com.google.common.base.Preconditions.checkNotNull; 19 20 import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil; 21 import com.google.errorprone.annotations.concurrent.GuardedBy; 22 import java.util.ArrayDeque; 23 import java.util.Queue; 24 import java.util.concurrent.Executor; 25 26 /** 27 * Passes tasks to a delegate {@link Executor} for execution, ensuring that no more than a fixed 28 * number of them are submitted at any one time. If the limit is reached, then new tasks will be 29 * queued up and submitted when possible. 30 * 31 * <p>If the delegate {@link Executor} is only accessed via some {@link ThrottlingExecutor}, this 32 * effectively limits the number of concurrent tasks/threads. Alternatively, we can wrap a shared 33 * {@link Executor} in a {@link ThrottlingExecutor} before passing it to some component, to limit 34 * the amount of concurrency that component can request. 35 * 36 * <p>If the delegate {@link Executor} rejects queued tasks, they will be silently dropped. To keep 37 * the implementation simple, should the delegate {@link Executor} reject tasks, then we may end up 38 * with some tasks queued even if fewer tasks are actually running than our bound allows. We will 39 * keep submitting new tasks, however, so that transient problems in the underlying {@link Executor} 40 * do not prevent new tasks from running. 41 */ 42 public class ThrottlingExecutor implements Executor { 43 private static final String TAG = "ThrottlingExecutor"; 44 45 private final Executor delegateExecutor; 46 private final int maximumThreads; 47 48 private final Object lock = new Object(); 49 50 @GuardedBy("lock") 51 private int count = 0; 52 53 @GuardedBy("lock") 54 private Queue<Runnable> queue = new ArrayDeque<>(); 55 ThrottlingExecutor(Executor delegate, int maximumThreads)56 public ThrottlingExecutor(Executor delegate, int maximumThreads) { 57 delegateExecutor = delegate; 58 this.maximumThreads = maximumThreads; 59 } 60 61 @Override execute(Runnable runnable)62 public void execute(Runnable runnable) { 63 checkNotNull(runnable); 64 65 synchronized (lock) { 66 if (count >= maximumThreads) { 67 queue.add(runnable); 68 return; 69 } 70 71 // We're going to run the task immediately, outside this synchronized block 72 count++; 73 } 74 75 try { 76 delegateExecutor.execute(new WrappedRunnable(runnable)); 77 } catch (Throwable t) { 78 synchronized (lock) { 79 count--; 80 } 81 throw t; 82 } 83 } 84 85 /** 86 * Submits the next task from the {@link Queue}, decrementing the count of actively running tasks 87 * if there aren't any. This method is immediately after a {@link Runnable} has completed. 88 * 89 * <p>There are a couple of design points here. 90 * 91 * <p>Firstly, how do we run the next task? We could either submit the next task using {@link 92 * Executor#execute()}, or we could run it inline. The first approach is simpler, but has the 93 * drawback that the delegate Executor can see more than {@code maximumThreads} tasks running 94 * simultaneously: as we are submitted here from the end of an old task, the Executor briefly 95 * considers both the old and new task to be running. The second approach avoids this and may be 96 * slightly more efficient, but has added complexity (in particular in exception handling - we 97 * still want any unchecked Thowables thrown from a task to be propagated on). Also, if other 98 * tasks are waiting to run on the delegate {@link Executor} without passing through this 99 * throttle, then the second approach can prevent those tasks from having a chance to run. If in 100 * doubt, keep it simple - so we adopt the first approach. 101 * 102 * <p>Secondly, we want any Throwables thrown during the task to be propagated on to the delegate 103 * {@link Executor}, as if the {@link ThrottlingExecutor} wasn't in the way. But what about 104 * exceptions thrown by the {@link Executor#execute} method? That method could throw {@link 105 * RejectedExecutionException} in particular if the executor has been shut down but also for any 106 * other reason. It could also throw the usual range of unchecked exceptions. In either situation 107 * we simply decrement the count so that newly submitted tasks can still be attempted. Note that 108 * this approach, while simple, can leave some tasks "stranded" on the queue until other tasks are 109 * submitted and finish - if the underlying executor has been shutdown or permanently broken then 110 * this makes no difference; otherwise should never arise but if they do, at least new tasks can 111 * attempt to be run. 112 */ submitNextTaskOrDecrementCount()113 private void submitNextTaskOrDecrementCount() { 114 Runnable toSubmit; 115 synchronized (lock) { 116 toSubmit = queue.poll(); 117 if (toSubmit == null) { 118 count--; 119 return; 120 } 121 } 122 123 try { 124 delegateExecutor.execute(new WrappedRunnable(toSubmit)); 125 } catch (Throwable t) { 126 // Suppress this exception, which is probably a RejectedExecutionException. We're called from 127 // the finally block after some other task has completed, and we don't want to suppress any 128 // exception from the just-completed task. 129 LogUtil.e(t, "%s: Task submission failed: %s", TAG, toSubmit); 130 synchronized (lock) { 131 count--; 132 } 133 } 134 } 135 136 private class WrappedRunnable implements Runnable { 137 private final Runnable delegateRunnable; 138 WrappedRunnable(Runnable delegate)139 public WrappedRunnable(Runnable delegate) { 140 delegateRunnable = delegate; 141 } 142 143 @Override run()144 public void run() { 145 try { 146 delegateRunnable.run(); 147 } finally { 148 submitNextTaskOrDecrementCount(); 149 } 150 } 151 } 152 } 153