1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.util.concurrent.MoreExecutors; 24 import io.grpc.Status; 25 import java.util.concurrent.ScheduledExecutorService; 26 import java.util.concurrent.ScheduledFuture; 27 import java.util.concurrent.TimeUnit; 28 29 /** 30 * Manages keepalive pings. 31 */ 32 public class KeepAliveManager { 33 private static final SystemTicker SYSTEM_TICKER = new SystemTicker(); 34 private static final long MIN_KEEPALIVE_TIME_NANOS = TimeUnit.SECONDS.toNanos(10); 35 private static final long MIN_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(10L); 36 37 private final ScheduledExecutorService scheduler; 38 private final Ticker ticker; 39 private final KeepAlivePinger keepAlivePinger; 40 private final boolean keepAliveDuringTransportIdle; 41 private State state = State.IDLE; 42 private long nextKeepaliveTime; 43 private ScheduledFuture<?> shutdownFuture; 44 private ScheduledFuture<?> pingFuture; 45 private final Runnable shutdown = new LogExceptionRunnable(new Runnable() { 46 @Override 47 public void run() { 48 boolean shouldShutdown = false; 49 synchronized (KeepAliveManager.this) { 50 if (state != State.DISCONNECTED) { 51 // We haven't received a ping response within the timeout. The connection is likely gone 52 // already. Shutdown the transport and fail all existing rpcs. 53 state = State.DISCONNECTED; 54 shouldShutdown = true; 55 } 56 } 57 if (shouldShutdown) { 58 keepAlivePinger.onPingTimeout(); 59 } 60 } 61 }); 62 private final Runnable sendPing = new LogExceptionRunnable(new Runnable() { 63 @Override 64 public void run() { 65 pingFuture = null; 66 boolean shouldSendPing = false; 67 synchronized (KeepAliveManager.this) { 68 if (state == State.PING_SCHEDULED) { 69 shouldSendPing = true; 70 state = State.PING_SENT; 71 // Schedule a shutdown. It fires if we don't receive the ping response within the timeout. 72 shutdownFuture = scheduler.schedule(shutdown, keepAliveTimeoutInNanos, 73 TimeUnit.NANOSECONDS); 74 } else if (state == State.PING_DELAYED) { 75 // We have received some data. Reschedule the ping with the new time. 76 pingFuture = scheduler.schedule( 77 sendPing, 78 nextKeepaliveTime - ticker.read(), 79 TimeUnit.NANOSECONDS); 80 state = State.PING_SCHEDULED; 81 } 82 } 83 if (shouldSendPing) { 84 // Send the ping. 85 keepAlivePinger.ping(); 86 } 87 } 88 }); 89 90 private long keepAliveTimeInNanos; 91 private long keepAliveTimeoutInNanos; 92 93 private enum State { 94 /* 95 * We don't need to do any keepalives. This means the transport has no active rpcs and 96 * keepAliveDuringTransportIdle == false. 97 */ 98 IDLE, 99 /* 100 * We have scheduled a ping to be sent in the future. We may decide to delay it if we receive 101 * some data. 102 */ 103 PING_SCHEDULED, 104 /* 105 * We need to delay the scheduled keepalive ping. 106 */ 107 PING_DELAYED, 108 /* 109 * The ping has been sent out. Waiting for a ping response. 110 */ 111 PING_SENT, 112 /* 113 * Transport goes idle after ping has been sent. 114 */ 115 IDLE_AND_PING_SENT, 116 /* 117 * The transport has been disconnected. We won't do keepalives any more. 118 */ 119 DISCONNECTED, 120 } 121 122 /** 123 * Creates a KeepAliverManager. 124 */ KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)125 public KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, 126 long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, 127 boolean keepAliveDuringTransportIdle) { 128 this(keepAlivePinger, scheduler, SYSTEM_TICKER, keepAliveTimeInNanos, keepAliveTimeoutInNanos, 129 keepAliveDuringTransportIdle); 130 } 131 132 @VisibleForTesting KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, Ticker ticker, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)133 KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, 134 Ticker ticker, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, 135 boolean keepAliveDuringTransportIdle) { 136 this.keepAlivePinger = checkNotNull(keepAlivePinger, "keepAlivePinger"); 137 this.scheduler = checkNotNull(scheduler, "scheduler"); 138 this.ticker = checkNotNull(ticker, "ticker"); 139 this.keepAliveTimeInNanos = keepAliveTimeInNanos; 140 this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; 141 this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle; 142 nextKeepaliveTime = ticker.read() + keepAliveTimeInNanos; 143 } 144 145 /** Start keepalive monitoring. */ onTransportStarted()146 public synchronized void onTransportStarted() { 147 if (keepAliveDuringTransportIdle) { 148 onTransportActive(); 149 } 150 } 151 152 /** 153 * Transport has received some data so that we can delay sending keepalives. 154 */ onDataReceived()155 public synchronized void onDataReceived() { 156 nextKeepaliveTime = ticker.read() + keepAliveTimeInNanos; 157 // We do not cancel the ping future here. This avoids constantly scheduling and cancellation in 158 // a busy transport. Instead, we update the status here and reschedule later. So we actually 159 // keep one sendPing task always in flight when there're active rpcs. 160 if (state == State.PING_SCHEDULED) { 161 state = State.PING_DELAYED; 162 } else if (state == State.PING_SENT || state == State.IDLE_AND_PING_SENT) { 163 // Ping acked or effectively ping acked. Cancel shutdown, and then if not idle, 164 // schedule a new keep-alive ping. 165 if (shutdownFuture != null) { 166 shutdownFuture.cancel(false); 167 } 168 if (state == State.IDLE_AND_PING_SENT) { 169 // not to schedule new pings until onTransportActive 170 state = State.IDLE; 171 return; 172 } 173 // schedule a new ping 174 state = State.PING_SCHEDULED; 175 checkState(pingFuture == null, "There should be no outstanding pingFuture"); 176 pingFuture = scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS); 177 } 178 } 179 180 /** 181 * Transport has active streams. Start sending keepalives if necessary. 182 */ onTransportActive()183 public synchronized void onTransportActive() { 184 if (state == State.IDLE) { 185 // When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to 186 // quickly check whether the connection is still working. 187 state = State.PING_SCHEDULED; 188 if (pingFuture == null) { 189 pingFuture = scheduler.schedule( 190 sendPing, 191 nextKeepaliveTime - ticker.read(), 192 TimeUnit.NANOSECONDS); 193 } 194 } else if (state == State.IDLE_AND_PING_SENT) { 195 state = State.PING_SENT; 196 } // Other states are possible when keepAliveDuringTransportIdle == true 197 } 198 199 /** 200 * Transport has finished all streams. 201 */ onTransportIdle()202 public synchronized void onTransportIdle() { 203 if (keepAliveDuringTransportIdle) { 204 return; 205 } 206 if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) { 207 state = State.IDLE; 208 } 209 if (state == State.PING_SENT) { 210 state = State.IDLE_AND_PING_SENT; 211 } 212 } 213 214 /** 215 * Transport is being terminated. We no longer need to do keepalives. 216 */ onTransportTermination()217 public synchronized void onTransportTermination() { 218 if (state != State.DISCONNECTED) { 219 state = State.DISCONNECTED; 220 if (shutdownFuture != null) { 221 shutdownFuture.cancel(false); 222 } 223 if (pingFuture != null) { 224 pingFuture.cancel(false); 225 pingFuture = null; 226 } 227 } 228 } 229 230 /** 231 * Bumps keepalive time to 10 seconds if the specified value was smaller than that. 232 */ clampKeepAliveTimeInNanos(long keepAliveTimeInNanos)233 public static long clampKeepAliveTimeInNanos(long keepAliveTimeInNanos) { 234 return Math.max(keepAliveTimeInNanos, MIN_KEEPALIVE_TIME_NANOS); 235 } 236 237 /** 238 * Bumps keepalive timeout to 10 milliseconds if the specified value was smaller than that. 239 */ clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos)240 public static long clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos) { 241 return Math.max(keepAliveTimeoutInNanos, MIN_KEEPALIVE_TIMEOUT_NANOS); 242 } 243 244 public interface KeepAlivePinger { 245 /** 246 * Sends out a keep-alive ping. 247 */ ping()248 void ping(); 249 250 /** 251 * Callback when Ping Ack was not received in KEEPALIVE_TIMEOUT. Should shutdown the transport. 252 */ onPingTimeout()253 void onPingTimeout(); 254 } 255 256 /** 257 * Default client side {@link KeepAlivePinger}. 258 */ 259 public static final class ClientKeepAlivePinger implements KeepAlivePinger { 260 private final ConnectionClientTransport transport; 261 ClientKeepAlivePinger(ConnectionClientTransport transport)262 public ClientKeepAlivePinger(ConnectionClientTransport transport) { 263 this.transport = transport; 264 } 265 266 @Override ping()267 public void ping() { 268 transport.ping(new ClientTransport.PingCallback() { 269 @Override 270 public void onSuccess(long roundTripTimeNanos) {} 271 272 @Override 273 public void onFailure(Throwable cause) { 274 transport.shutdownNow(Status.UNAVAILABLE.withDescription( 275 "Keepalive failed. The connection is likely gone")); 276 } 277 }, MoreExecutors.directExecutor()); 278 } 279 280 @Override onPingTimeout()281 public void onPingTimeout() { 282 transport.shutdownNow(Status.UNAVAILABLE.withDescription( 283 "Keepalive failed. The connection is likely gone")); 284 } 285 } 286 287 // TODO(zsurocking): Classes below are copied from Deadline.java. We should consider share the 288 // code. 289 290 /** Time source representing nanoseconds since fixed but arbitrary point in time. */ 291 abstract static class Ticker { 292 /** Returns the number of nanoseconds since this source's epoch. */ read()293 public abstract long read(); 294 } 295 296 private static class SystemTicker extends Ticker { 297 @Override read()298 public long read() { 299 return System.nanoTime(); 300 } 301 } 302 } 303 304