1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.MoreObjects; 21 import com.google.common.base.Preconditions; 22 import com.google.common.collect.ImmutableList; 23 import com.google.common.util.concurrent.ListenableFuture; 24 import com.google.common.util.concurrent.SettableFuture; 25 import io.grpc.InternalChannelz.SocketStats; 26 import io.grpc.InternalLogId; 27 import io.grpc.ServerStreamTracer; 28 import io.grpc.Status; 29 import io.grpc.internal.ServerTransport; 30 import io.grpc.internal.ServerTransportListener; 31 import io.grpc.internal.TransportTracer; 32 import io.netty.channel.Channel; 33 import io.netty.channel.ChannelFuture; 34 import io.netty.channel.ChannelFutureListener; 35 import io.netty.channel.ChannelHandler; 36 import io.netty.channel.ChannelPromise; 37 import io.netty.util.concurrent.Future; 38 import io.netty.util.concurrent.GenericFutureListener; 39 import java.io.IOException; 40 import java.util.List; 41 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.logging.Level; 43 import java.util.logging.Logger; 44 45 /** 46 * The Netty-based server transport. 47 */ 48 class NettyServerTransport implements ServerTransport { 49 private static final Logger log = Logger.getLogger(NettyServerTransport.class.getName()); 50 // connectionLog is for connection related messages only 51 private static final Logger connectionLog = Logger.getLogger( 52 String.format("%s.connections", NettyServerTransport.class.getName())); 53 // Some exceptions are not very useful and add too much noise to the log 54 private static final ImmutableList<String> QUIET_ERRORS = ImmutableList.of( 55 "Connection reset by peer", 56 "An existing connection was forcibly closed by the remote host"); 57 58 private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); 59 private final Channel channel; 60 private final ChannelPromise channelUnused; 61 private final ProtocolNegotiator protocolNegotiator; 62 private final int maxStreams; 63 // only accessed from channel event loop 64 private NettyServerHandler grpcHandler; 65 private ServerTransportListener listener; 66 private boolean terminated; 67 private final int flowControlWindow; 68 private final int maxMessageSize; 69 private final int maxHeaderListSize; 70 private final long keepAliveTimeInNanos; 71 private final long keepAliveTimeoutInNanos; 72 private final long maxConnectionIdleInNanos; 73 private final long maxConnectionAgeInNanos; 74 private final long maxConnectionAgeGraceInNanos; 75 private final boolean permitKeepAliveWithoutCalls; 76 private final long permitKeepAliveTimeInNanos; 77 private final List<ServerStreamTracer.Factory> streamTracerFactories; 78 private final TransportTracer transportTracer; 79 NettyServerTransport( Channel channel, ChannelPromise channelUnused, ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos)80 NettyServerTransport( 81 Channel channel, 82 ChannelPromise channelUnused, 83 ProtocolNegotiator protocolNegotiator, 84 List<ServerStreamTracer.Factory> streamTracerFactories, 85 TransportTracer transportTracer, 86 int maxStreams, 87 int flowControlWindow, 88 int maxMessageSize, 89 int maxHeaderListSize, 90 long keepAliveTimeInNanos, 91 long keepAliveTimeoutInNanos, 92 long maxConnectionIdleInNanos, 93 long maxConnectionAgeInNanos, 94 long maxConnectionAgeGraceInNanos, 95 boolean permitKeepAliveWithoutCalls, 96 long permitKeepAliveTimeInNanos) { 97 this.channel = Preconditions.checkNotNull(channel, "channel"); 98 this.channelUnused = channelUnused; 99 this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); 100 this.streamTracerFactories = 101 Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); 102 this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); 103 this.maxStreams = maxStreams; 104 this.flowControlWindow = flowControlWindow; 105 this.maxMessageSize = maxMessageSize; 106 this.maxHeaderListSize = maxHeaderListSize; 107 this.keepAliveTimeInNanos = keepAliveTimeInNanos; 108 this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; 109 this.maxConnectionIdleInNanos = maxConnectionIdleInNanos; 110 this.maxConnectionAgeInNanos = maxConnectionAgeInNanos; 111 this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; 112 this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; 113 this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; 114 } 115 start(ServerTransportListener listener)116 public void start(ServerTransportListener listener) { 117 Preconditions.checkState(this.listener == null, "Handler already registered"); 118 this.listener = listener; 119 120 // Create the Netty handler for the pipeline. 121 grpcHandler = createHandler(listener, channelUnused); 122 NettyHandlerSettings.setAutoWindow(grpcHandler); 123 124 // Notify when the channel closes. 125 final class TerminationNotifier implements ChannelFutureListener { 126 boolean done; 127 128 @Override 129 public void operationComplete(ChannelFuture future) throws Exception { 130 if (!done) { 131 done = true; 132 notifyTerminated(grpcHandler.connectionError()); 133 } 134 } 135 } 136 137 ChannelFutureListener terminationNotifier = new TerminationNotifier(); 138 channelUnused.addListener(terminationNotifier); 139 channel.closeFuture().addListener(terminationNotifier); 140 141 ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler); 142 channel.pipeline().addLast(negotiationHandler); 143 } 144 145 @Override getScheduledExecutorService()146 public ScheduledExecutorService getScheduledExecutorService() { 147 return channel.eventLoop(); 148 } 149 150 @Override shutdown()151 public void shutdown() { 152 if (channel.isOpen()) { 153 channel.close(); 154 } 155 } 156 157 @Override shutdownNow(Status reason)158 public void shutdownNow(Status reason) { 159 if (channel.isOpen()) { 160 channel.writeAndFlush(new ForcefulCloseCommand(reason)); 161 } 162 } 163 164 @Override getLogId()165 public InternalLogId getLogId() { 166 return logId; 167 } 168 169 /** 170 * For testing purposes only. 171 */ channel()172 Channel channel() { 173 return channel; 174 } 175 176 /** 177 * Accepts a throwable and returns the appropriate logging level. Uninteresting exceptions 178 * should not clutter the log. 179 */ 180 @VisibleForTesting getLogLevel(Throwable t)181 static Level getLogLevel(Throwable t) { 182 if (t instanceof IOException && t.getMessage() != null) { 183 for (String msg : QUIET_ERRORS) { 184 if (t.getMessage().equals(msg)) { 185 return Level.FINE; 186 } 187 } 188 } 189 return Level.INFO; 190 } 191 notifyTerminated(Throwable t)192 private void notifyTerminated(Throwable t) { 193 if (t != null) { 194 connectionLog.log(getLogLevel(t), "Transport failed", t); 195 } 196 if (!terminated) { 197 terminated = true; 198 listener.transportTerminated(); 199 } 200 } 201 202 @Override getStats()203 public ListenableFuture<SocketStats> getStats() { 204 final SettableFuture<SocketStats> result = SettableFuture.create(); 205 if (channel.eventLoop().inEventLoop()) { 206 // This is necessary, otherwise we will block forever if we get the future from inside 207 // the event loop. 208 result.set(getStatsHelper(channel)); 209 return result; 210 } 211 channel.eventLoop().submit( 212 new Runnable() { 213 @Override 214 public void run() { 215 result.set(getStatsHelper(channel)); 216 } 217 }) 218 .addListener( 219 new GenericFutureListener<Future<Object>>() { 220 @Override 221 public void operationComplete(Future<Object> future) throws Exception { 222 if (!future.isSuccess()) { 223 result.setException(future.cause()); 224 } 225 } 226 }); 227 return result; 228 } 229 getStatsHelper(Channel ch)230 private SocketStats getStatsHelper(Channel ch) { 231 Preconditions.checkState(ch.eventLoop().inEventLoop()); 232 return new SocketStats( 233 transportTracer.getStats(), 234 channel.localAddress(), 235 channel.remoteAddress(), 236 Utils.getSocketOptions(ch), 237 grpcHandler == null ? null : grpcHandler.getSecurityInfo()); 238 239 } 240 241 @Override toString()242 public String toString() { 243 return MoreObjects.toStringHelper(this) 244 .add("logId", logId.getId()) 245 .add("channel", channel) 246 .toString(); 247 } 248 249 /** 250 * Creates the Netty handler to be used in the channel pipeline. 251 */ createHandler( ServerTransportListener transportListener, ChannelPromise channelUnused)252 private NettyServerHandler createHandler( 253 ServerTransportListener transportListener, ChannelPromise channelUnused) { 254 return NettyServerHandler.newHandler( 255 transportListener, 256 channelUnused, 257 streamTracerFactories, 258 transportTracer, 259 maxStreams, 260 flowControlWindow, 261 maxHeaderListSize, 262 maxMessageSize, 263 keepAliveTimeInNanos, 264 keepAliveTimeoutInNanos, 265 maxConnectionIdleInNanos, 266 maxConnectionAgeInNanos, 267 maxConnectionAgeGraceInNanos, 268 permitKeepAliveWithoutCalls, 269 permitKeepAliveTimeInNanos); 270 } 271 } 272