• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import io.grpc.Status;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.TimeUnit;
28 
29 /**
30  * Manages keepalive pings.
31  */
32 public class KeepAliveManager {
33   private static final SystemTicker SYSTEM_TICKER = new SystemTicker();
34   private static final long MIN_KEEPALIVE_TIME_NANOS = TimeUnit.SECONDS.toNanos(10);
35   private static final long MIN_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
36 
37   private final ScheduledExecutorService scheduler;
38   private final Ticker ticker;
39   private final KeepAlivePinger keepAlivePinger;
40   private final boolean keepAliveDuringTransportIdle;
41   private State state = State.IDLE;
42   private long nextKeepaliveTime;
43   private ScheduledFuture<?> shutdownFuture;
44   private ScheduledFuture<?> pingFuture;
45   private final Runnable shutdown = new LogExceptionRunnable(new Runnable() {
46     @Override
47     public void run() {
48       boolean shouldShutdown = false;
49       synchronized (KeepAliveManager.this) {
50         if (state != State.DISCONNECTED) {
51           // We haven't received a ping response within the timeout. The connection is likely gone
52           // already. Shutdown the transport and fail all existing rpcs.
53           state = State.DISCONNECTED;
54           shouldShutdown = true;
55         }
56       }
57       if (shouldShutdown) {
58         keepAlivePinger.onPingTimeout();
59       }
60     }
61   });
62   private final Runnable sendPing = new LogExceptionRunnable(new Runnable() {
63     @Override
64     public void run() {
65       pingFuture = null;
66       boolean shouldSendPing = false;
67       synchronized (KeepAliveManager.this) {
68         if (state == State.PING_SCHEDULED) {
69           shouldSendPing = true;
70           state = State.PING_SENT;
71           // Schedule a shutdown. It fires if we don't receive the ping response within the timeout.
72           shutdownFuture = scheduler.schedule(shutdown, keepAliveTimeoutInNanos,
73               TimeUnit.NANOSECONDS);
74         } else if (state == State.PING_DELAYED) {
75           // We have received some data. Reschedule the ping with the new time.
76           pingFuture = scheduler.schedule(
77               sendPing,
78               nextKeepaliveTime - ticker.read(),
79               TimeUnit.NANOSECONDS);
80           state = State.PING_SCHEDULED;
81         }
82       }
83       if (shouldSendPing) {
84         // Send the ping.
85         keepAlivePinger.ping();
86       }
87     }
88   });
89 
90   private long keepAliveTimeInNanos;
91   private long keepAliveTimeoutInNanos;
92 
93   private enum State {
94     /*
95      * We don't need to do any keepalives. This means the transport has no active rpcs and
96      * keepAliveDuringTransportIdle == false.
97      */
98     IDLE,
99     /*
100      * We have scheduled a ping to be sent in the future. We may decide to delay it if we receive
101      * some data.
102      */
103     PING_SCHEDULED,
104     /*
105      * We need to delay the scheduled keepalive ping.
106      */
107     PING_DELAYED,
108     /*
109      * The ping has been sent out. Waiting for a ping response.
110      */
111     PING_SENT,
112     /*
113      * Transport goes idle after ping has been sent.
114      */
115     IDLE_AND_PING_SENT,
116     /*
117      * The transport has been disconnected. We won't do keepalives any more.
118      */
119     DISCONNECTED,
120   }
121 
122   /**
123    * Creates a KeepAliverManager.
124    */
KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)125   public KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
126                           long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
127                           boolean keepAliveDuringTransportIdle) {
128     this(keepAlivePinger, scheduler, SYSTEM_TICKER, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
129         keepAliveDuringTransportIdle);
130   }
131 
132   @VisibleForTesting
KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, Ticker ticker, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)133   KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
134                    Ticker ticker, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
135                    boolean keepAliveDuringTransportIdle) {
136     this.keepAlivePinger = checkNotNull(keepAlivePinger, "keepAlivePinger");
137     this.scheduler = checkNotNull(scheduler, "scheduler");
138     this.ticker = checkNotNull(ticker, "ticker");
139     this.keepAliveTimeInNanos = keepAliveTimeInNanos;
140     this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
141     this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle;
142     nextKeepaliveTime = ticker.read() + keepAliveTimeInNanos;
143   }
144 
145   /** Start keepalive monitoring. */
onTransportStarted()146   public synchronized void onTransportStarted() {
147     if (keepAliveDuringTransportIdle) {
148       onTransportActive();
149     }
150   }
151 
152   /**
153    * Transport has received some data so that we can delay sending keepalives.
154    */
onDataReceived()155   public synchronized void onDataReceived() {
156     nextKeepaliveTime = ticker.read() + keepAliveTimeInNanos;
157     // We do not cancel the ping future here. This avoids constantly scheduling and cancellation in
158     // a busy transport. Instead, we update the status here and reschedule later. So we actually
159     // keep one sendPing task always in flight when there're active rpcs.
160     if (state == State.PING_SCHEDULED) {
161       state = State.PING_DELAYED;
162     } else if (state == State.PING_SENT || state == State.IDLE_AND_PING_SENT) {
163       // Ping acked or effectively ping acked. Cancel shutdown, and then if not idle,
164       // schedule a new keep-alive ping.
165       if (shutdownFuture != null) {
166         shutdownFuture.cancel(false);
167       }
168       if (state == State.IDLE_AND_PING_SENT) {
169         // not to schedule new pings until onTransportActive
170         state = State.IDLE;
171         return;
172       }
173       // schedule a new ping
174       state = State.PING_SCHEDULED;
175       checkState(pingFuture == null, "There should be no outstanding pingFuture");
176       pingFuture = scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS);
177     }
178   }
179 
180   /**
181    * Transport has active streams. Start sending keepalives if necessary.
182    */
onTransportActive()183   public synchronized void onTransportActive() {
184     if (state == State.IDLE) {
185       // When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to
186       // quickly check whether the connection is still working.
187       state = State.PING_SCHEDULED;
188       if (pingFuture == null) {
189         pingFuture = scheduler.schedule(
190             sendPing,
191             nextKeepaliveTime - ticker.read(),
192             TimeUnit.NANOSECONDS);
193       }
194     } else if (state == State.IDLE_AND_PING_SENT) {
195       state = State.PING_SENT;
196     } // Other states are possible when keepAliveDuringTransportIdle == true
197   }
198 
199   /**
200    * Transport has finished all streams.
201    */
onTransportIdle()202   public synchronized void onTransportIdle() {
203     if (keepAliveDuringTransportIdle) {
204       return;
205     }
206     if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) {
207       state = State.IDLE;
208     }
209     if (state == State.PING_SENT) {
210       state = State.IDLE_AND_PING_SENT;
211     }
212   }
213 
214   /**
215    * Transport is being terminated. We no longer need to do keepalives.
216    */
onTransportTermination()217   public synchronized void onTransportTermination() {
218     if (state != State.DISCONNECTED) {
219       state = State.DISCONNECTED;
220       if (shutdownFuture != null) {
221         shutdownFuture.cancel(false);
222       }
223       if (pingFuture != null) {
224         pingFuture.cancel(false);
225         pingFuture = null;
226       }
227     }
228   }
229 
230   /**
231    * Bumps keepalive time to 10 seconds if the specified value was smaller than that.
232    */
clampKeepAliveTimeInNanos(long keepAliveTimeInNanos)233   public static long clampKeepAliveTimeInNanos(long keepAliveTimeInNanos) {
234     return Math.max(keepAliveTimeInNanos, MIN_KEEPALIVE_TIME_NANOS);
235   }
236 
237   /**
238    * Bumps keepalive timeout to 10 milliseconds if the specified value was smaller than that.
239    */
clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos)240   public static long clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos) {
241     return Math.max(keepAliveTimeoutInNanos, MIN_KEEPALIVE_TIMEOUT_NANOS);
242   }
243 
244   public interface KeepAlivePinger {
245     /**
246      * Sends out a keep-alive ping.
247      */
ping()248     void ping();
249 
250     /**
251      * Callback when Ping Ack was not received in KEEPALIVE_TIMEOUT. Should shutdown the transport.
252      */
onPingTimeout()253     void onPingTimeout();
254   }
255 
256   /**
257    * Default client side {@link KeepAlivePinger}.
258    */
259   public static final class ClientKeepAlivePinger implements KeepAlivePinger {
260     private final ConnectionClientTransport transport;
261 
ClientKeepAlivePinger(ConnectionClientTransport transport)262     public ClientKeepAlivePinger(ConnectionClientTransport transport) {
263       this.transport = transport;
264     }
265 
266     @Override
ping()267     public void ping() {
268       transport.ping(new ClientTransport.PingCallback() {
269         @Override
270         public void onSuccess(long roundTripTimeNanos) {}
271 
272         @Override
273         public void onFailure(Throwable cause) {
274           transport.shutdownNow(Status.UNAVAILABLE.withDescription(
275               "Keepalive failed. The connection is likely gone"));
276         }
277       }, MoreExecutors.directExecutor());
278     }
279 
280     @Override
onPingTimeout()281     public void onPingTimeout() {
282       transport.shutdownNow(Status.UNAVAILABLE.withDescription(
283           "Keepalive failed. The connection is likely gone"));
284     }
285   }
286 
287   // TODO(zsurocking): Classes below are copied from Deadline.java. We should consider share the
288   // code.
289 
290   /** Time source representing nanoseconds since fixed but arbitrary point in time. */
291   abstract static class Ticker {
292     /** Returns the number of nanoseconds since this source's epoch. */
read()293     public abstract long read();
294   }
295 
296   private static class SystemTicker extends Ticker {
297     @Override
read()298     public long read() {
299       return System.nanoTime();
300     }
301   }
302 }
303 
304