1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import com.google.common.base.Stopwatch; 20 import com.google.common.base.Supplier; 21 import com.google.common.base.Ticker; 22 import com.google.common.util.concurrent.AbstractFuture; 23 import java.util.ArrayList; 24 import java.util.Collection; 25 import java.util.List; 26 import java.util.concurrent.Callable; 27 import java.util.concurrent.Delayed; 28 import java.util.concurrent.Future; 29 import java.util.concurrent.PriorityBlockingQueue; 30 import java.util.concurrent.ScheduledExecutorService; 31 import java.util.concurrent.ScheduledFuture; 32 import java.util.concurrent.TimeUnit; 33 34 /** 35 * A manipulated clock that exports a {@link Ticker} and a {@link ScheduledExecutorService}. 36 * 37 * <p>To simulate the locking scenario of using real executors, it never runs tasks within {@code 38 * schedule()} or {@code execute()}. Instead, you should call {@link #runDueTasks} in your test 39 * method to run all due tasks. {@link #forwardTime} and {@link #forwardNanos} call {@link 40 * #runDueTasks} automatically. 41 */ 42 public final class FakeClock { 43 44 private static final TaskFilter ACCEPT_ALL_FILTER = new TaskFilter() { 45 @Override 46 public boolean shouldAccept(Runnable command) { 47 return true; 48 } 49 }; 50 51 private final ScheduledExecutorService scheduledExecutorService = new ScheduledExecutorImpl(); 52 53 private final PriorityBlockingQueue<ScheduledTask> tasks = 54 new PriorityBlockingQueue<ScheduledTask>(); 55 56 private final Ticker ticker = 57 new Ticker() { 58 @Override public long read() { 59 return currentTimeNanos; 60 } 61 }; 62 63 private final Supplier<Stopwatch> stopwatchSupplier = 64 new Supplier<Stopwatch>() { 65 @Override public Stopwatch get() { 66 return Stopwatch.createUnstarted(ticker); 67 } 68 }; 69 70 private long currentTimeNanos; 71 72 public class ScheduledTask extends AbstractFuture<Void> implements ScheduledFuture<Void> { 73 public final Runnable command; 74 public final long dueTimeNanos; 75 ScheduledTask(long dueTimeNanos, Runnable command)76 ScheduledTask(long dueTimeNanos, Runnable command) { 77 this.dueTimeNanos = dueTimeNanos; 78 this.command = command; 79 } 80 cancel(boolean mayInterruptIfRunning)81 @Override public boolean cancel(boolean mayInterruptIfRunning) { 82 tasks.remove(this); 83 return super.cancel(mayInterruptIfRunning); 84 } 85 getDelay(TimeUnit unit)86 @Override public long getDelay(TimeUnit unit) { 87 return unit.convert(dueTimeNanos - currentTimeNanos, TimeUnit.NANOSECONDS); 88 } 89 compareTo(Delayed other)90 @Override public int compareTo(Delayed other) { 91 ScheduledTask otherTask = (ScheduledTask) other; 92 if (dueTimeNanos > otherTask.dueTimeNanos) { 93 return 1; 94 } else if (dueTimeNanos < otherTask.dueTimeNanos) { 95 return -1; 96 } else { 97 return 0; 98 } 99 } 100 complete()101 void complete() { 102 set(null); 103 } 104 105 @Override toString()106 public String toString() { 107 return "[due=" + dueTimeNanos + ", task=" + command + "]"; 108 } 109 } 110 111 private class ScheduledExecutorImpl implements ScheduledExecutorService { schedule( Callable<V> callable, long delay, TimeUnit unit)112 @Override public <V> ScheduledFuture<V> schedule( 113 Callable<V> callable, long delay, TimeUnit unit) { 114 throw new UnsupportedOperationException(); 115 } 116 schedule(Runnable cmd, long delay, TimeUnit unit)117 @Override public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) { 118 ScheduledTask task = new ScheduledTask(currentTimeNanos + unit.toNanos(delay), cmd); 119 tasks.add(task); 120 return task; 121 } 122 scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit)123 @Override public ScheduledFuture<?> scheduleAtFixedRate( 124 Runnable command, long initialDelay, long period, TimeUnit unit) { 125 throw new UnsupportedOperationException(); 126 } 127 scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit)128 @Override public ScheduledFuture<?> scheduleWithFixedDelay( 129 Runnable command, long initialDelay, long delay, TimeUnit unit) { 130 throw new UnsupportedOperationException(); 131 } 132 awaitTermination(long timeout, TimeUnit unit)133 @Override public boolean awaitTermination(long timeout, TimeUnit unit) { 134 throw new UnsupportedOperationException(); 135 } 136 invokeAll(Collection<? extends Callable<T>> tasks)137 @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 138 throw new UnsupportedOperationException(); 139 } 140 invokeAll( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)141 @Override public <T> List<Future<T>> invokeAll( 142 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { 143 throw new UnsupportedOperationException(); 144 } 145 invokeAny(Collection<? extends Callable<T>> tasks)146 @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) { 147 throw new UnsupportedOperationException(); 148 } 149 invokeAny( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)150 @Override public <T> T invokeAny( 151 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { 152 throw new UnsupportedOperationException(); 153 } 154 isShutdown()155 @Override public boolean isShutdown() { 156 throw new UnsupportedOperationException(); 157 } 158 isTerminated()159 @Override public boolean isTerminated() { 160 throw new UnsupportedOperationException(); 161 } 162 shutdown()163 @Override public void shutdown() { 164 throw new UnsupportedOperationException(); 165 } 166 shutdownNow()167 @Override public List<Runnable> shutdownNow() { 168 throw new UnsupportedOperationException(); 169 } 170 submit(Callable<T> task)171 @Override public <T> Future<T> submit(Callable<T> task) { 172 throw new UnsupportedOperationException(); 173 } 174 submit(Runnable task)175 @Override public Future<?> submit(Runnable task) { 176 throw new UnsupportedOperationException(); 177 } 178 submit(Runnable task, T result)179 @Override public <T> Future<T> submit(Runnable task, T result) { 180 throw new UnsupportedOperationException(); 181 } 182 execute(Runnable command)183 @Override public void execute(Runnable command) { 184 // Since it is being enqueued immediately, no point in tracing the future for cancellation. 185 Future<?> unused = schedule(command, 0, TimeUnit.NANOSECONDS); 186 } 187 } 188 189 /** 190 * Provides a partially implemented instance of {@link ScheduledExecutorService} that uses the 191 * fake clock ticker for testing. 192 */ getScheduledExecutorService()193 public ScheduledExecutorService getScheduledExecutorService() { 194 return scheduledExecutorService; 195 } 196 197 /** 198 * Provides a stopwatch instance that uses the fake clock ticker. 199 */ getStopwatchSupplier()200 public Supplier<Stopwatch> getStopwatchSupplier() { 201 return stopwatchSupplier; 202 } 203 204 /** 205 * Ticker of the FakeClock. 206 */ getTicker()207 public Ticker getTicker() { 208 return ticker; 209 } 210 211 /** 212 * Run all due tasks. 213 * 214 * @return the number of tasks run by this call 215 */ runDueTasks()216 public int runDueTasks() { 217 int count = 0; 218 while (true) { 219 ScheduledTask task = tasks.peek(); 220 if (task == null || task.dueTimeNanos > currentTimeNanos) { 221 break; 222 } 223 if (tasks.remove(task)) { 224 task.command.run(); 225 task.complete(); 226 count++; 227 } 228 } 229 return count; 230 } 231 232 /** 233 * Return all due tasks. 234 */ getDueTasks()235 public Collection<ScheduledTask> getDueTasks() { 236 ArrayList<ScheduledTask> result = new ArrayList<>(); 237 for (ScheduledTask task : tasks) { 238 if (task.dueTimeNanos > currentTimeNanos) { 239 continue; 240 } 241 result.add(task); 242 } 243 return result; 244 } 245 246 /** 247 * Return all unrun tasks. 248 */ getPendingTasks()249 public Collection<ScheduledTask> getPendingTasks() { 250 return getPendingTasks(ACCEPT_ALL_FILTER); 251 } 252 253 /** 254 * Return all unrun tasks accepted by the given filter. 255 */ getPendingTasks(TaskFilter filter)256 public Collection<ScheduledTask> getPendingTasks(TaskFilter filter) { 257 ArrayList<ScheduledTask> result = new ArrayList<>(); 258 for (ScheduledTask task : tasks) { 259 if (filter.shouldAccept(task.command)) { 260 result.add(task); 261 } 262 } 263 return result; 264 } 265 266 /** 267 * Forward the time by the given duration and run all due tasks. 268 * 269 * @return the number of tasks run by this call 270 */ forwardTime(long value, TimeUnit unit)271 public int forwardTime(long value, TimeUnit unit) { 272 currentTimeNanos += unit.toNanos(value); 273 return runDueTasks(); 274 } 275 276 /** 277 * Forward the time by the given nanoseconds and run all due tasks. 278 * 279 * @return the number of tasks run by this call 280 */ forwardNanos(long nanos)281 public int forwardNanos(long nanos) { 282 return forwardTime(nanos, TimeUnit.NANOSECONDS); 283 } 284 285 /** 286 * Return the number of queued tasks. 287 */ numPendingTasks()288 public int numPendingTasks() { 289 return tasks.size(); 290 } 291 292 /** 293 * Return the number of queued tasks accepted by the given filter. 294 */ numPendingTasks(TaskFilter filter)295 public int numPendingTasks(TaskFilter filter) { 296 int count = 0; 297 for (ScheduledTask task : tasks) { 298 if (filter.shouldAccept(task.command)) { 299 count++; 300 } 301 } 302 return count; 303 } 304 currentTimeMillis()305 public long currentTimeMillis() { 306 // Normally millis and nanos are of different epochs. Add an offset to simulate that. 307 return TimeUnit.NANOSECONDS.toMillis(currentTimeNanos + 123456789L); 308 } 309 310 /** 311 * A filter that allows us to have fine grained control over which tasks are accepted for certain 312 * operation. 313 */ 314 public interface TaskFilter { 315 /** 316 * Inspect the Runnable and returns true if it should be accepted. 317 */ shouldAccept(Runnable runnable)318 boolean shouldAccept(Runnable runnable); 319 } 320 } 321