• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2018 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Stopwatch;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 
26 /**
27  * Reschedules a runnable lazily.
28  */
29 final class Rescheduler {
30 
31   // deps
32   private final ScheduledExecutorService scheduler;
33   private final Executor serializingExecutor;
34   private final Runnable runnable;
35 
36   // state
37   private final Stopwatch stopwatch;
38   private long runAtNanos;
39   private boolean enabled;
40   private ScheduledFuture<?> wakeUp;
41 
Rescheduler( Runnable r, Executor serializingExecutor, ScheduledExecutorService scheduler, Stopwatch stopwatch)42   Rescheduler(
43       Runnable r,
44       Executor serializingExecutor,
45       ScheduledExecutorService scheduler,
46       Stopwatch stopwatch) {
47     this.runnable = r;
48     this.serializingExecutor = serializingExecutor;
49     this.scheduler = scheduler;
50     this.stopwatch = stopwatch;
51     stopwatch.start();
52   }
53 
54   /* must be called from the {@link #serializingExecutor} originally passed in. */
reschedule(long delay, TimeUnit timeUnit)55   void reschedule(long delay, TimeUnit timeUnit) {
56     long delayNanos = timeUnit.toNanos(delay);
57     long newRunAtNanos = nanoTime() + delayNanos;
58     enabled = true;
59     if (newRunAtNanos - runAtNanos < 0 || wakeUp == null) {
60       if (wakeUp != null) {
61         wakeUp.cancel(false);
62       }
63       wakeUp = scheduler.schedule(new FutureRunnable(), delayNanos, TimeUnit.NANOSECONDS);
64     }
65     runAtNanos = newRunAtNanos;
66   }
67 
68   // must be called from channel executor
cancel(boolean permanent)69   void cancel(boolean permanent) {
70     enabled = false;
71     if (permanent && wakeUp != null) {
72       wakeUp.cancel(false);
73       wakeUp = null;
74     }
75   }
76 
77   private final class FutureRunnable implements Runnable {
78     @Override
run()79     public void run() {
80       Rescheduler.this.serializingExecutor.execute(new ChannelFutureRunnable());
81     }
82 
isEnabled()83     private boolean isEnabled() {
84       return Rescheduler.this.enabled;
85     }
86   }
87 
88   private final class ChannelFutureRunnable implements Runnable {
89 
90     @Override
run()91     public void run() {
92       if (!enabled) {
93         wakeUp = null;
94         return;
95       }
96       long now = nanoTime();
97       if (runAtNanos - now > 0) {
98         wakeUp = scheduler.schedule(
99             new FutureRunnable(), runAtNanos - now,  TimeUnit.NANOSECONDS);
100       } else {
101         enabled = false;
102         wakeUp = null;
103         runnable.run();
104       }
105     }
106   }
107 
108   @VisibleForTesting
isEnabled(Runnable r)109   static boolean isEnabled(Runnable r) {
110     return ((FutureRunnable) r).isEnabled();
111   }
112 
nanoTime()113   private long nanoTime() {
114     return stopwatch.elapsed(TimeUnit.NANOSECONDS);
115   }
116 }
117