1 /* 2 * Copyright 2018 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.Stopwatch; 21 import java.util.concurrent.Executor; 22 import java.util.concurrent.ScheduledExecutorService; 23 import java.util.concurrent.ScheduledFuture; 24 import java.util.concurrent.TimeUnit; 25 26 /** 27 * Reschedules a runnable lazily. 28 */ 29 final class Rescheduler { 30 31 // deps 32 private final ScheduledExecutorService scheduler; 33 private final Executor serializingExecutor; 34 private final Runnable runnable; 35 36 // state 37 private final Stopwatch stopwatch; 38 private long runAtNanos; 39 private boolean enabled; 40 private ScheduledFuture<?> wakeUp; 41 Rescheduler( Runnable r, Executor serializingExecutor, ScheduledExecutorService scheduler, Stopwatch stopwatch)42 Rescheduler( 43 Runnable r, 44 Executor serializingExecutor, 45 ScheduledExecutorService scheduler, 46 Stopwatch stopwatch) { 47 this.runnable = r; 48 this.serializingExecutor = serializingExecutor; 49 this.scheduler = scheduler; 50 this.stopwatch = stopwatch; 51 stopwatch.start(); 52 } 53 54 /* must be called from the {@link #serializingExecutor} originally passed in. */ reschedule(long delay, TimeUnit timeUnit)55 void reschedule(long delay, TimeUnit timeUnit) { 56 long delayNanos = timeUnit.toNanos(delay); 57 long newRunAtNanos = nanoTime() + delayNanos; 58 enabled = true; 59 if (newRunAtNanos - runAtNanos < 0 || wakeUp == null) { 60 if (wakeUp != null) { 61 wakeUp.cancel(false); 62 } 63 wakeUp = scheduler.schedule(new FutureRunnable(), delayNanos, TimeUnit.NANOSECONDS); 64 } 65 runAtNanos = newRunAtNanos; 66 } 67 68 // must be called from channel executor cancel(boolean permanent)69 void cancel(boolean permanent) { 70 enabled = false; 71 if (permanent && wakeUp != null) { 72 wakeUp.cancel(false); 73 wakeUp = null; 74 } 75 } 76 77 private final class FutureRunnable implements Runnable { 78 @Override run()79 public void run() { 80 Rescheduler.this.serializingExecutor.execute(new ChannelFutureRunnable()); 81 } 82 isEnabled()83 private boolean isEnabled() { 84 return Rescheduler.this.enabled; 85 } 86 } 87 88 private final class ChannelFutureRunnable implements Runnable { 89 90 @Override run()91 public void run() { 92 if (!enabled) { 93 wakeUp = null; 94 return; 95 } 96 long now = nanoTime(); 97 if (runAtNanos - now > 0) { 98 wakeUp = scheduler.schedule( 99 new FutureRunnable(), runAtNanos - now, TimeUnit.NANOSECONDS); 100 } else { 101 enabled = false; 102 wakeUp = null; 103 runnable.run(); 104 } 105 } 106 } 107 108 @VisibleForTesting isEnabled(Runnable r)109 static boolean isEnabled(Runnable r) { 110 return ((FutureRunnable) r).isEnabled(); 111 } 112 nanoTime()113 private long nanoTime() { 114 return stopwatch.elapsed(TimeUnit.NANOSECONDS); 115 } 116 } 117