• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import com.google.common.base.Stopwatch;
20 import com.google.common.base.Supplier;
21 import com.google.common.base.Ticker;
22 import com.google.common.util.concurrent.AbstractFuture;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.Delayed;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.TimeUnit;
33 
34 /**
35  * A manipulated clock that exports a {@link Ticker} and a {@link ScheduledExecutorService}.
36  *
37  * <p>To simulate the locking scenario of using real executors, it never runs tasks within {@code
38  * schedule()} or {@code execute()}. Instead, you should call {@link #runDueTasks} in your test
39  * method to run all due tasks. {@link #forwardTime} and {@link #forwardNanos} call {@link
40  * #runDueTasks} automatically.
41  */
42 public final class FakeClock {
43 
44   private static final TaskFilter ACCEPT_ALL_FILTER = new TaskFilter() {
45       @Override
46       public boolean shouldAccept(Runnable command) {
47         return true;
48       }
49     };
50 
51   private final ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorImpl();
52 
53   private final PriorityBlockingQueue<ScheduledTask> tasks =
54       new PriorityBlockingQueue<ScheduledTask>();
55 
56   private final Ticker ticker =
57       new Ticker() {
58         @Override public long read() {
59           return currentTimeNanos;
60         }
61       };
62 
63   private final Supplier<Stopwatch> stopwatchSupplier =
64       new Supplier<Stopwatch>() {
65         @Override public Stopwatch get() {
66           return Stopwatch.createUnstarted(ticker);
67         }
68       };
69 
70   private long currentTimeNanos;
71 
72   public class ScheduledTask extends AbstractFuture<Void> implements ScheduledFuture<Void> {
73     public final Runnable command;
74     public final long dueTimeNanos;
75 
ScheduledTask(long dueTimeNanos, Runnable command)76     ScheduledTask(long dueTimeNanos, Runnable command) {
77       this.dueTimeNanos = dueTimeNanos;
78       this.command = command;
79     }
80 
cancel(boolean mayInterruptIfRunning)81     @Override public boolean cancel(boolean mayInterruptIfRunning) {
82       tasks.remove(this);
83       return super.cancel(mayInterruptIfRunning);
84     }
85 
getDelay(TimeUnit unit)86     @Override public long getDelay(TimeUnit unit) {
87       return unit.convert(dueTimeNanos - currentTimeNanos, TimeUnit.NANOSECONDS);
88     }
89 
compareTo(Delayed other)90     @Override public int compareTo(Delayed other) {
91       ScheduledTask otherTask = (ScheduledTask) other;
92       if (dueTimeNanos > otherTask.dueTimeNanos) {
93         return 1;
94       } else if (dueTimeNanos < otherTask.dueTimeNanos) {
95         return -1;
96       } else {
97         return 0;
98       }
99     }
100 
complete()101     void complete() {
102       set(null);
103     }
104 
105     @Override
toString()106     public String toString() {
107       return "[due=" + dueTimeNanos + ", task=" + command + "]";
108     }
109   }
110 
111   private class ScheduledExecutorImpl implements ScheduledExecutorService {
schedule( Callable<V> callable, long delay, TimeUnit unit)112     @Override public <V> ScheduledFuture<V> schedule(
113         Callable<V> callable, long delay, TimeUnit unit) {
114       throw new UnsupportedOperationException();
115     }
116 
schedule(Runnable cmd, long delay, TimeUnit unit)117     @Override public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
118       ScheduledTask task = new ScheduledTask(currentTimeNanos + unit.toNanos(delay), cmd);
119       tasks.add(task);
120       return task;
121     }
122 
scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit)123     @Override public ScheduledFuture<?> scheduleAtFixedRate(
124         Runnable command, long initialDelay, long period, TimeUnit unit) {
125       throw new UnsupportedOperationException();
126     }
127 
scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit)128     @Override public ScheduledFuture<?> scheduleWithFixedDelay(
129         Runnable command, long initialDelay, long delay, TimeUnit unit) {
130       throw new UnsupportedOperationException();
131     }
132 
awaitTermination(long timeout, TimeUnit unit)133     @Override public boolean awaitTermination(long timeout, TimeUnit unit) {
134       throw new UnsupportedOperationException();
135     }
136 
invokeAll(Collection<? extends Callable<T>> tasks)137     @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
138       throw new UnsupportedOperationException();
139     }
140 
invokeAll( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)141     @Override public <T> List<Future<T>> invokeAll(
142         Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
143       throw new UnsupportedOperationException();
144     }
145 
invokeAny(Collection<? extends Callable<T>> tasks)146     @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
147       throw new UnsupportedOperationException();
148     }
149 
invokeAny( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)150     @Override public <T> T invokeAny(
151         Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
152       throw new UnsupportedOperationException();
153     }
154 
isShutdown()155     @Override public boolean isShutdown() {
156       throw new UnsupportedOperationException();
157     }
158 
isTerminated()159     @Override public boolean isTerminated() {
160       throw new UnsupportedOperationException();
161     }
162 
shutdown()163     @Override public void shutdown() {
164       throw new UnsupportedOperationException();
165     }
166 
shutdownNow()167     @Override public List<Runnable> shutdownNow() {
168       throw new UnsupportedOperationException();
169     }
170 
submit(Callable<T> task)171     @Override public <T> Future<T> submit(Callable<T> task) {
172       throw new UnsupportedOperationException();
173     }
174 
submit(Runnable task)175     @Override public Future<?> submit(Runnable task) {
176       throw new UnsupportedOperationException();
177     }
178 
submit(Runnable task, T result)179     @Override public <T> Future<T> submit(Runnable task, T result) {
180       throw new UnsupportedOperationException();
181     }
182 
execute(Runnable command)183     @Override public void execute(Runnable command) {
184       // Since it is being enqueued immediately, no point in tracing the future for cancellation.
185       Future<?> unused = schedule(command, 0, TimeUnit.NANOSECONDS);
186     }
187   }
188 
189   /**
190    * Provides a partially implemented instance of {@link ScheduledExecutorService} that uses the
191    * fake clock ticker for testing.
192    */
getScheduledExecutorService()193   public ScheduledExecutorService getScheduledExecutorService() {
194     return scheduledExecutorService;
195   }
196 
197   /**
198    * Provides a stopwatch instance that uses the fake clock ticker.
199    */
getStopwatchSupplier()200   public Supplier<Stopwatch> getStopwatchSupplier() {
201     return stopwatchSupplier;
202   }
203 
204   /**
205    * Ticker of the FakeClock.
206    */
getTicker()207   public Ticker getTicker() {
208     return ticker;
209   }
210 
211   /**
212    * Run all due tasks.
213    *
214    * @return the number of tasks run by this call
215    */
runDueTasks()216   public int runDueTasks() {
217     int count = 0;
218     while (true) {
219       ScheduledTask task = tasks.peek();
220       if (task == null || task.dueTimeNanos > currentTimeNanos) {
221         break;
222       }
223       if (tasks.remove(task)) {
224         task.command.run();
225         task.complete();
226         count++;
227       }
228     }
229     return count;
230   }
231 
232   /**
233    * Return all due tasks.
234    */
getDueTasks()235   public Collection<ScheduledTask> getDueTasks() {
236     ArrayList<ScheduledTask> result = new ArrayList<>();
237     for (ScheduledTask task : tasks) {
238       if (task.dueTimeNanos > currentTimeNanos) {
239         continue;
240       }
241       result.add(task);
242     }
243     return result;
244   }
245 
246   /**
247    * Return all unrun tasks.
248    */
getPendingTasks()249   public Collection<ScheduledTask> getPendingTasks() {
250     return getPendingTasks(ACCEPT_ALL_FILTER);
251   }
252 
253   /**
254    * Return all unrun tasks accepted by the given filter.
255    */
getPendingTasks(TaskFilter filter)256   public Collection<ScheduledTask> getPendingTasks(TaskFilter filter) {
257     ArrayList<ScheduledTask> result = new ArrayList<>();
258     for (ScheduledTask task : tasks) {
259       if (filter.shouldAccept(task.command)) {
260         result.add(task);
261       }
262     }
263     return result;
264   }
265 
266   /**
267    * Forward the time by the given duration and run all due tasks.
268    *
269    * @return the number of tasks run by this call
270    */
forwardTime(long value, TimeUnit unit)271   public int forwardTime(long value, TimeUnit unit) {
272     currentTimeNanos += unit.toNanos(value);
273     return runDueTasks();
274   }
275 
276   /**
277    * Forward the time by the given nanoseconds and run all due tasks.
278    *
279    * @return the number of tasks run by this call
280    */
forwardNanos(long nanos)281   public int forwardNanos(long nanos) {
282     return forwardTime(nanos, TimeUnit.NANOSECONDS);
283   }
284 
285   /**
286    * Return the number of queued tasks.
287    */
numPendingTasks()288   public int numPendingTasks() {
289     return tasks.size();
290   }
291 
292   /**
293    * Return the number of queued tasks accepted by the given filter.
294    */
numPendingTasks(TaskFilter filter)295   public int numPendingTasks(TaskFilter filter) {
296     int count = 0;
297     for (ScheduledTask task : tasks) {
298       if (filter.shouldAccept(task.command)) {
299         count++;
300       }
301     }
302     return count;
303   }
304 
currentTimeMillis()305   public long currentTimeMillis() {
306     // Normally millis and nanos are of different epochs. Add an offset to simulate that.
307     return TimeUnit.NANOSECONDS.toMillis(currentTimeNanos + 123456789L);
308   }
309 
310   /**
311    * A filter that allows us to have fine grained control over which tasks are accepted for certain
312    * operation.
313    */
314   public interface TaskFilter {
315     /**
316      * Inspect the Runnable and returns true if it should be accepted.
317      */
shouldAccept(Runnable runnable)318     boolean shouldAccept(Runnable runnable);
319   }
320 }
321