• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.google.android.libraries.mobiledatadownload.downloader.offroad;
17 
18 import static com.google.common.base.Preconditions.checkNotNull;
19 
20 import com.google.android.libraries.mobiledatadownload.internal.logging.LogUtil;
21 import com.google.errorprone.annotations.concurrent.GuardedBy;
22 import java.util.ArrayDeque;
23 import java.util.Queue;
24 import java.util.concurrent.Executor;
25 
26 /**
27  * Passes tasks to a delegate {@link Executor} for execution, ensuring that no more than a fixed
28  * number of them are submitted at any one time. If the limit is reached, then new tasks will be
29  * queued up and submitted when possible.
30  *
31  * <p>If the delegate {@link Executor} is only accessed via some {@link ThrottlingExecutor}, this
32  * effectively limits the number of concurrent tasks/threads. Alternatively, we can wrap a shared
33  * {@link Executor} in a {@link ThrottlingExecutor} before passing it to some component, to limit
34  * the amount of concurrency that component can request.
35  *
36  * <p>If the delegate {@link Executor} rejects queued tasks, they will be silently dropped. To keep
37  * the implementation simple, should the delegate {@link Executor} reject tasks, then we may end up
38  * with some tasks queued even if fewer tasks are actually running than our bound allows. We will
39  * keep submitting new tasks, however, so that transient problems in the underlying {@link Executor}
40  * do not prevent new tasks from running.
41  */
42 public class ThrottlingExecutor implements Executor {
43   private static final String TAG = "ThrottlingExecutor";
44 
45   private final Executor delegateExecutor;
46   private final int maximumThreads;
47 
48   private final Object lock = new Object();
49 
50   @GuardedBy("lock")
51   private int count = 0;
52 
53   @GuardedBy("lock")
54   private Queue<Runnable> queue = new ArrayDeque<>();
55 
ThrottlingExecutor(Executor delegate, int maximumThreads)56   public ThrottlingExecutor(Executor delegate, int maximumThreads) {
57     delegateExecutor = delegate;
58     this.maximumThreads = maximumThreads;
59   }
60 
61   @Override
execute(Runnable runnable)62   public void execute(Runnable runnable) {
63     checkNotNull(runnable);
64 
65     synchronized (lock) {
66       if (count >= maximumThreads) {
67         queue.add(runnable);
68         return;
69       }
70 
71       // We're going to run the task immediately, outside this synchronized block
72       count++;
73     }
74 
75     try {
76       delegateExecutor.execute(new WrappedRunnable(runnable));
77     } catch (Throwable t) {
78       synchronized (lock) {
79         count--;
80       }
81       throw t;
82     }
83   }
84 
85   /**
86    * Submits the next task from the {@link Queue}, decrementing the count of actively running tasks
87    * if there aren't any. This method is immediately after a {@link Runnable} has completed.
88    *
89    * <p>There are a couple of design points here.
90    *
91    * <p>Firstly, how do we run the next task? We could either submit the next task using {@link
92    * Executor#execute()}, or we could run it inline. The first approach is simpler, but has the
93    * drawback that the delegate Executor can see more than {@code maximumThreads} tasks running
94    * simultaneously: as we are submitted here from the end of an old task, the Executor briefly
95    * considers both the old and new task to be running. The second approach avoids this and may be
96    * slightly more efficient, but has added complexity (in particular in exception handling - we
97    * still want any unchecked Thowables thrown from a task to be propagated on). Also, if other
98    * tasks are waiting to run on the delegate {@link Executor} without passing through this
99    * throttle, then the second approach can prevent those tasks from having a chance to run. If in
100    * doubt, keep it simple - so we adopt the first approach.
101    *
102    * <p>Secondly, we want any Throwables thrown during the task to be propagated on to the delegate
103    * {@link Executor}, as if the {@link ThrottlingExecutor} wasn't in the way. But what about
104    * exceptions thrown by the {@link Executor#execute} method? That method could throw {@link
105    * RejectedExecutionException} in particular if the executor has been shut down but also for any
106    * other reason. It could also throw the usual range of unchecked exceptions. In either situation
107    * we simply decrement the count so that newly submitted tasks can still be attempted. Note that
108    * this approach, while simple, can leave some tasks "stranded" on the queue until other tasks are
109    * submitted and finish - if the underlying executor has been shutdown or permanently broken then
110    * this makes no difference; otherwise should never arise but if they do, at least new tasks can
111    * attempt to be run.
112    */
submitNextTaskOrDecrementCount()113   private void submitNextTaskOrDecrementCount() {
114     Runnable toSubmit;
115     synchronized (lock) {
116       toSubmit = queue.poll();
117       if (toSubmit == null) {
118         count--;
119         return;
120       }
121     }
122 
123     try {
124       delegateExecutor.execute(new WrappedRunnable(toSubmit));
125     } catch (Throwable t) {
126       // Suppress this exception, which is probably a RejectedExecutionException. We're called from
127       // the finally block after some other task has completed, and we don't want to suppress any
128       // exception from the just-completed task.
129       LogUtil.e(t, "%s: Task submission failed: %s", TAG, toSubmit);
130       synchronized (lock) {
131         count--;
132       }
133     }
134   }
135 
136   private class WrappedRunnable implements Runnable {
137     private final Runnable delegateRunnable;
138 
WrappedRunnable(Runnable delegate)139     public WrappedRunnable(Runnable delegate) {
140       delegateRunnable = delegate;
141     }
142 
143     @Override
run()144     public void run() {
145       try {
146         delegateRunnable.run();
147       } finally {
148         submitNextTaskOrDecrementCount();
149       }
150     }
151   }
152 }
153