• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Stopwatch;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import io.grpc.Status;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.concurrent.GuardedBy;
30 
31 /**
32  * Manages keepalive pings.
33  */
34 public class KeepAliveManager {
35   private static final long MIN_KEEPALIVE_TIME_NANOS = TimeUnit.SECONDS.toNanos(10);
36   private static final long MIN_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
37 
38   private final ScheduledExecutorService scheduler;
39   @GuardedBy("this")
40   private final Stopwatch stopwatch;
41   private final KeepAlivePinger keepAlivePinger;
42   private final boolean keepAliveDuringTransportIdle;
43   @GuardedBy("this")
44   private State state = State.IDLE;
45   @GuardedBy("this")
46   private ScheduledFuture<?> shutdownFuture;
47   @GuardedBy("this")
48   private ScheduledFuture<?> pingFuture;
49   private final Runnable shutdown = new LogExceptionRunnable(new Runnable() {
50     @Override
51     public void run() {
52       boolean shouldShutdown = false;
53       synchronized (KeepAliveManager.this) {
54         if (state != State.DISCONNECTED) {
55           // We haven't received a ping response within the timeout. The connection is likely gone
56           // already. Shutdown the transport and fail all existing rpcs.
57           state = State.DISCONNECTED;
58           shouldShutdown = true;
59         }
60       }
61       if (shouldShutdown) {
62         keepAlivePinger.onPingTimeout();
63       }
64     }
65   });
66   private final Runnable sendPing = new LogExceptionRunnable(new Runnable() {
67     @Override
68     public void run() {
69       boolean shouldSendPing = false;
70       synchronized (KeepAliveManager.this) {
71         pingFuture = null;
72         if (state == State.PING_SCHEDULED) {
73           shouldSendPing = true;
74           state = State.PING_SENT;
75           // Schedule a shutdown. It fires if we don't receive the ping response within the timeout.
76           shutdownFuture = scheduler.schedule(shutdown, keepAliveTimeoutInNanos,
77               TimeUnit.NANOSECONDS);
78         } else if (state == State.PING_DELAYED) {
79           // We have received some data. Reschedule the ping with the new time.
80           pingFuture = scheduler.schedule(
81               sendPing,
82               keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS),
83               TimeUnit.NANOSECONDS);
84           state = State.PING_SCHEDULED;
85         }
86       }
87       if (shouldSendPing) {
88         // Send the ping.
89         keepAlivePinger.ping();
90       }
91     }
92   });
93 
94   private final long keepAliveTimeInNanos;
95   private final long keepAliveTimeoutInNanos;
96 
97   private enum State {
98     /*
99      * We don't need to do any keepalives. This means the transport has no active rpcs and
100      * keepAliveDuringTransportIdle == false.
101      */
102     IDLE,
103     /*
104      * We have scheduled a ping to be sent in the future. We may decide to delay it if we receive
105      * some data.
106      */
107     PING_SCHEDULED,
108     /*
109      * We need to delay the scheduled keepalive ping.
110      */
111     PING_DELAYED,
112     /*
113      * The ping has been sent out. Waiting for a ping response.
114      */
115     PING_SENT,
116     /*
117      * Transport goes idle after ping has been sent.
118      */
119     IDLE_AND_PING_SENT,
120     /*
121      * The transport has been disconnected. We won't do keepalives any more.
122      */
123     DISCONNECTED,
124   }
125 
126   /**
127    * Creates a KeepAliverManager.
128    */
KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)129   public KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
130                           long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
131                           boolean keepAliveDuringTransportIdle) {
132     this(keepAlivePinger, scheduler, Stopwatch.createUnstarted(), keepAliveTimeInNanos,
133         keepAliveTimeoutInNanos,
134         keepAliveDuringTransportIdle);
135   }
136 
137   @VisibleForTesting
KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, Stopwatch stopwatch, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)138   KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
139       Stopwatch stopwatch, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
140                    boolean keepAliveDuringTransportIdle) {
141     this.keepAlivePinger = checkNotNull(keepAlivePinger, "keepAlivePinger");
142     this.scheduler = checkNotNull(scheduler, "scheduler");
143     this.stopwatch = checkNotNull(stopwatch, "stopwatch");
144     this.keepAliveTimeInNanos = keepAliveTimeInNanos;
145     this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
146     this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle;
147     stopwatch.reset().start();
148   }
149 
150   /** Start keepalive monitoring. */
onTransportStarted()151   public synchronized void onTransportStarted() {
152     if (keepAliveDuringTransportIdle) {
153       onTransportActive();
154     }
155   }
156 
157   /**
158    * Transport has received some data so that we can delay sending keepalives.
159    */
onDataReceived()160   public synchronized void onDataReceived() {
161     stopwatch.reset().start();
162     // We do not cancel the ping future here. This avoids constantly scheduling and cancellation in
163     // a busy transport. Instead, we update the status here and reschedule later. So we actually
164     // keep one sendPing task always in flight when there're active rpcs.
165     if (state == State.PING_SCHEDULED) {
166       state = State.PING_DELAYED;
167     } else if (state == State.PING_SENT || state == State.IDLE_AND_PING_SENT) {
168       // Ping acked or effectively ping acked. Cancel shutdown, and then if not idle,
169       // schedule a new keep-alive ping.
170       if (shutdownFuture != null) {
171         shutdownFuture.cancel(false);
172       }
173       if (state == State.IDLE_AND_PING_SENT) {
174         // not to schedule new pings until onTransportActive
175         state = State.IDLE;
176         return;
177       }
178       // schedule a new ping
179       state = State.PING_SCHEDULED;
180       checkState(pingFuture == null, "There should be no outstanding pingFuture");
181       pingFuture = scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS);
182     }
183   }
184 
185   /**
186    * Transport has active streams. Start sending keepalives if necessary.
187    */
onTransportActive()188   public synchronized void onTransportActive() {
189     if (state == State.IDLE) {
190       // When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to
191       // quickly check whether the connection is still working.
192       state = State.PING_SCHEDULED;
193       if (pingFuture == null) {
194         pingFuture = scheduler.schedule(
195             sendPing,
196             keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS),
197             TimeUnit.NANOSECONDS);
198       }
199     } else if (state == State.IDLE_AND_PING_SENT) {
200       state = State.PING_SENT;
201     } // Other states are possible when keepAliveDuringTransportIdle == true
202   }
203 
204   /**
205    * Transport has finished all streams.
206    */
onTransportIdle()207   public synchronized void onTransportIdle() {
208     if (keepAliveDuringTransportIdle) {
209       return;
210     }
211     if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) {
212       state = State.IDLE;
213     }
214     if (state == State.PING_SENT) {
215       state = State.IDLE_AND_PING_SENT;
216     }
217   }
218 
219   /**
220    * Transport is being terminated. We no longer need to do keepalives.
221    */
onTransportTermination()222   public synchronized void onTransportTermination() {
223     if (state != State.DISCONNECTED) {
224       state = State.DISCONNECTED;
225       if (shutdownFuture != null) {
226         shutdownFuture.cancel(false);
227       }
228       if (pingFuture != null) {
229         pingFuture.cancel(false);
230         pingFuture = null;
231       }
232     }
233   }
234 
235   /**
236    * Bumps keepalive time to 10 seconds if the specified value was smaller than that.
237    */
clampKeepAliveTimeInNanos(long keepAliveTimeInNanos)238   public static long clampKeepAliveTimeInNanos(long keepAliveTimeInNanos) {
239     return Math.max(keepAliveTimeInNanos, MIN_KEEPALIVE_TIME_NANOS);
240   }
241 
242   /**
243    * Bumps keepalive timeout to 10 milliseconds if the specified value was smaller than that.
244    */
clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos)245   public static long clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos) {
246     return Math.max(keepAliveTimeoutInNanos, MIN_KEEPALIVE_TIMEOUT_NANOS);
247   }
248 
249   public interface KeepAlivePinger {
250     /**
251      * Sends out a keep-alive ping.
252      */
ping()253     void ping();
254 
255     /**
256      * Callback when Ping Ack was not received in KEEPALIVE_TIMEOUT. Should shutdown the transport.
257      */
onPingTimeout()258     void onPingTimeout();
259   }
260 
261   /**
262    * Default client side {@link KeepAlivePinger}.
263    */
264   public static final class ClientKeepAlivePinger implements KeepAlivePinger {
265     private final ConnectionClientTransport transport;
266 
ClientKeepAlivePinger(ConnectionClientTransport transport)267     public ClientKeepAlivePinger(ConnectionClientTransport transport) {
268       this.transport = transport;
269     }
270 
271     @Override
ping()272     public void ping() {
273       transport.ping(new ClientTransport.PingCallback() {
274         @Override
275         public void onSuccess(long roundTripTimeNanos) {}
276 
277         @Override
278         public void onFailure(Throwable cause) {
279           transport.shutdownNow(Status.UNAVAILABLE.withDescription(
280               "Keepalive failed. The connection is likely gone"));
281         }
282       }, MoreExecutors.directExecutor());
283     }
284 
285     @Override
onPingTimeout()286     public void onPingTimeout() {
287       transport.shutdownNow(Status.UNAVAILABLE.withDescription(
288           "Keepalive failed. The connection is likely gone"));
289     }
290   }
291 }
292 
293