1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Stopwatch; 24 import com.google.common.util.concurrent.MoreExecutors; 25 import io.grpc.Status; 26 import java.util.concurrent.ScheduledExecutorService; 27 import java.util.concurrent.ScheduledFuture; 28 import java.util.concurrent.TimeUnit; 29 import javax.annotation.concurrent.GuardedBy; 30 31 /** 32 * Manages keepalive pings. 33 */ 34 public class KeepAliveManager { 35 private static final long MIN_KEEPALIVE_TIME_NANOS = TimeUnit.SECONDS.toNanos(10); 36 private static final long MIN_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(10L); 37 38 private final ScheduledExecutorService scheduler; 39 @GuardedBy("this") 40 private final Stopwatch stopwatch; 41 private final KeepAlivePinger keepAlivePinger; 42 private final boolean keepAliveDuringTransportIdle; 43 @GuardedBy("this") 44 private State state = State.IDLE; 45 @GuardedBy("this") 46 private ScheduledFuture<?> shutdownFuture; 47 @GuardedBy("this") 48 private ScheduledFuture<?> pingFuture; 49 private final Runnable shutdown = new LogExceptionRunnable(new Runnable() { 50 @Override 51 public void run() { 52 boolean shouldShutdown = false; 53 synchronized (KeepAliveManager.this) { 54 if (state != State.DISCONNECTED) { 55 // We haven't received a ping response within the timeout. The connection is likely gone 56 // already. Shutdown the transport and fail all existing rpcs. 57 state = State.DISCONNECTED; 58 shouldShutdown = true; 59 } 60 } 61 if (shouldShutdown) { 62 keepAlivePinger.onPingTimeout(); 63 } 64 } 65 }); 66 private final Runnable sendPing = new LogExceptionRunnable(new Runnable() { 67 @Override 68 public void run() { 69 boolean shouldSendPing = false; 70 synchronized (KeepAliveManager.this) { 71 pingFuture = null; 72 if (state == State.PING_SCHEDULED) { 73 shouldSendPing = true; 74 state = State.PING_SENT; 75 // Schedule a shutdown. It fires if we don't receive the ping response within the timeout. 76 shutdownFuture = scheduler.schedule(shutdown, keepAliveTimeoutInNanos, 77 TimeUnit.NANOSECONDS); 78 } else if (state == State.PING_DELAYED) { 79 // We have received some data. Reschedule the ping with the new time. 80 pingFuture = scheduler.schedule( 81 sendPing, 82 keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS), 83 TimeUnit.NANOSECONDS); 84 state = State.PING_SCHEDULED; 85 } 86 } 87 if (shouldSendPing) { 88 // Send the ping. 89 keepAlivePinger.ping(); 90 } 91 } 92 }); 93 94 private final long keepAliveTimeInNanos; 95 private final long keepAliveTimeoutInNanos; 96 97 private enum State { 98 /* 99 * We don't need to do any keepalives. This means the transport has no active rpcs and 100 * keepAliveDuringTransportIdle == false. 101 */ 102 IDLE, 103 /* 104 * We have scheduled a ping to be sent in the future. We may decide to delay it if we receive 105 * some data. 106 */ 107 PING_SCHEDULED, 108 /* 109 * We need to delay the scheduled keepalive ping. 110 */ 111 PING_DELAYED, 112 /* 113 * The ping has been sent out. Waiting for a ping response. 114 */ 115 PING_SENT, 116 /* 117 * Transport goes idle after ping has been sent. 118 */ 119 IDLE_AND_PING_SENT, 120 /* 121 * The transport has been disconnected. We won't do keepalives any more. 122 */ 123 DISCONNECTED, 124 } 125 126 /** 127 * Creates a KeepAliverManager. 128 */ KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)129 public KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, 130 long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, 131 boolean keepAliveDuringTransportIdle) { 132 this(keepAlivePinger, scheduler, Stopwatch.createUnstarted(), keepAliveTimeInNanos, 133 keepAliveTimeoutInNanos, 134 keepAliveDuringTransportIdle); 135 } 136 137 @VisibleForTesting KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, Stopwatch stopwatch, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, boolean keepAliveDuringTransportIdle)138 KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler, 139 Stopwatch stopwatch, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, 140 boolean keepAliveDuringTransportIdle) { 141 this.keepAlivePinger = checkNotNull(keepAlivePinger, "keepAlivePinger"); 142 this.scheduler = checkNotNull(scheduler, "scheduler"); 143 this.stopwatch = checkNotNull(stopwatch, "stopwatch"); 144 this.keepAliveTimeInNanos = keepAliveTimeInNanos; 145 this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; 146 this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle; 147 stopwatch.reset().start(); 148 } 149 150 /** Start keepalive monitoring. */ onTransportStarted()151 public synchronized void onTransportStarted() { 152 if (keepAliveDuringTransportIdle) { 153 onTransportActive(); 154 } 155 } 156 157 /** 158 * Transport has received some data so that we can delay sending keepalives. 159 */ onDataReceived()160 public synchronized void onDataReceived() { 161 stopwatch.reset().start(); 162 // We do not cancel the ping future here. This avoids constantly scheduling and cancellation in 163 // a busy transport. Instead, we update the status here and reschedule later. So we actually 164 // keep one sendPing task always in flight when there're active rpcs. 165 if (state == State.PING_SCHEDULED) { 166 state = State.PING_DELAYED; 167 } else if (state == State.PING_SENT || state == State.IDLE_AND_PING_SENT) { 168 // Ping acked or effectively ping acked. Cancel shutdown, and then if not idle, 169 // schedule a new keep-alive ping. 170 if (shutdownFuture != null) { 171 shutdownFuture.cancel(false); 172 } 173 if (state == State.IDLE_AND_PING_SENT) { 174 // not to schedule new pings until onTransportActive 175 state = State.IDLE; 176 return; 177 } 178 // schedule a new ping 179 state = State.PING_SCHEDULED; 180 checkState(pingFuture == null, "There should be no outstanding pingFuture"); 181 pingFuture = scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS); 182 } 183 } 184 185 /** 186 * Transport has active streams. Start sending keepalives if necessary. 187 */ onTransportActive()188 public synchronized void onTransportActive() { 189 if (state == State.IDLE) { 190 // When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to 191 // quickly check whether the connection is still working. 192 state = State.PING_SCHEDULED; 193 if (pingFuture == null) { 194 pingFuture = scheduler.schedule( 195 sendPing, 196 keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS), 197 TimeUnit.NANOSECONDS); 198 } 199 } else if (state == State.IDLE_AND_PING_SENT) { 200 state = State.PING_SENT; 201 } // Other states are possible when keepAliveDuringTransportIdle == true 202 } 203 204 /** 205 * Transport has finished all streams. 206 */ onTransportIdle()207 public synchronized void onTransportIdle() { 208 if (keepAliveDuringTransportIdle) { 209 return; 210 } 211 if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) { 212 state = State.IDLE; 213 } 214 if (state == State.PING_SENT) { 215 state = State.IDLE_AND_PING_SENT; 216 } 217 } 218 219 /** 220 * Transport is being terminated. We no longer need to do keepalives. 221 */ onTransportTermination()222 public synchronized void onTransportTermination() { 223 if (state != State.DISCONNECTED) { 224 state = State.DISCONNECTED; 225 if (shutdownFuture != null) { 226 shutdownFuture.cancel(false); 227 } 228 if (pingFuture != null) { 229 pingFuture.cancel(false); 230 pingFuture = null; 231 } 232 } 233 } 234 235 /** 236 * Bumps keepalive time to 10 seconds if the specified value was smaller than that. 237 */ clampKeepAliveTimeInNanos(long keepAliveTimeInNanos)238 public static long clampKeepAliveTimeInNanos(long keepAliveTimeInNanos) { 239 return Math.max(keepAliveTimeInNanos, MIN_KEEPALIVE_TIME_NANOS); 240 } 241 242 /** 243 * Bumps keepalive timeout to 10 milliseconds if the specified value was smaller than that. 244 */ clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos)245 public static long clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos) { 246 return Math.max(keepAliveTimeoutInNanos, MIN_KEEPALIVE_TIMEOUT_NANOS); 247 } 248 249 public interface KeepAlivePinger { 250 /** 251 * Sends out a keep-alive ping. 252 */ ping()253 void ping(); 254 255 /** 256 * Callback when Ping Ack was not received in KEEPALIVE_TIMEOUT. Should shutdown the transport. 257 */ onPingTimeout()258 void onPingTimeout(); 259 } 260 261 /** 262 * Default client side {@link KeepAlivePinger}. 263 */ 264 public static final class ClientKeepAlivePinger implements KeepAlivePinger { 265 private final ConnectionClientTransport transport; 266 ClientKeepAlivePinger(ConnectionClientTransport transport)267 public ClientKeepAlivePinger(ConnectionClientTransport transport) { 268 this.transport = transport; 269 } 270 271 @Override ping()272 public void ping() { 273 transport.ping(new ClientTransport.PingCallback() { 274 @Override 275 public void onSuccess(long roundTripTimeNanos) {} 276 277 @Override 278 public void onFailure(Throwable cause) { 279 transport.shutdownNow(Status.UNAVAILABLE.withDescription( 280 "Keepalive failed. The connection is likely gone")); 281 } 282 }, MoreExecutors.directExecutor()); 283 } 284 285 @Override onPingTimeout()286 public void onPingTimeout() { 287 transport.shutdownNow(Status.UNAVAILABLE.withDescription( 288 "Keepalive failed. The connection is likely gone")); 289 } 290 } 291 } 292 293