1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.MoreObjects; 21 import com.google.common.base.Preconditions; 22 import com.google.common.collect.ImmutableList; 23 import com.google.common.util.concurrent.ListenableFuture; 24 import com.google.common.util.concurrent.SettableFuture; 25 import io.grpc.Attributes; 26 import io.grpc.InternalChannelz.SocketStats; 27 import io.grpc.InternalLogId; 28 import io.grpc.ServerStreamTracer; 29 import io.grpc.Status; 30 import io.grpc.internal.ServerTransport; 31 import io.grpc.internal.ServerTransportListener; 32 import io.grpc.internal.TransportTracer; 33 import io.netty.channel.Channel; 34 import io.netty.channel.ChannelFuture; 35 import io.netty.channel.ChannelFutureListener; 36 import io.netty.channel.ChannelHandler; 37 import io.netty.channel.ChannelPromise; 38 import io.netty.util.concurrent.Future; 39 import io.netty.util.concurrent.GenericFutureListener; 40 import java.io.IOException; 41 import java.net.SocketAddress; 42 import java.net.SocketException; 43 import java.util.List; 44 import java.util.concurrent.ScheduledExecutorService; 45 import java.util.logging.Level; 46 import java.util.logging.Logger; 47 48 /** 49 * The Netty-based server transport. 50 */ 51 class NettyServerTransport implements ServerTransport { 52 // connectionLog is for connection related messages only 53 private static final Logger connectionLog = Logger.getLogger( 54 String.format("%s.connections", NettyServerTransport.class.getName())); 55 56 // Some exceptions are not very useful and add too much noise to the log 57 private static final ImmutableList<String> QUIET_EXCEPTIONS = ImmutableList.of( 58 "NativeIoException" /* Netty exceptions */); 59 60 private final InternalLogId logId; 61 private final Channel channel; 62 private final ChannelPromise channelUnused; 63 private final ProtocolNegotiator protocolNegotiator; 64 private final int maxStreams; 65 // only accessed from channel event loop 66 private NettyServerHandler grpcHandler; 67 private ServerTransportListener listener; 68 private boolean terminated; 69 private final boolean autoFlowControl; 70 private final int flowControlWindow; 71 private final int maxMessageSize; 72 private final int maxHeaderListSize; 73 private final long keepAliveTimeInNanos; 74 private final long keepAliveTimeoutInNanos; 75 private final long maxConnectionIdleInNanos; 76 private final long maxConnectionAgeInNanos; 77 private final long maxConnectionAgeGraceInNanos; 78 private final boolean permitKeepAliveWithoutCalls; 79 private final long permitKeepAliveTimeInNanos; 80 private final Attributes eagAttributes; 81 private final List<? extends ServerStreamTracer.Factory> streamTracerFactories; 82 private final TransportTracer transportTracer; 83 NettyServerTransport( Channel channel, ChannelPromise channelUnused, ProtocolNegotiator protocolNegotiator, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, Attributes eagAttributes)84 NettyServerTransport( 85 Channel channel, 86 ChannelPromise channelUnused, 87 ProtocolNegotiator protocolNegotiator, 88 List<? extends ServerStreamTracer.Factory> streamTracerFactories, 89 TransportTracer transportTracer, 90 int maxStreams, 91 boolean autoFlowControl, 92 int flowControlWindow, 93 int maxMessageSize, 94 int maxHeaderListSize, 95 long keepAliveTimeInNanos, 96 long keepAliveTimeoutInNanos, 97 long maxConnectionIdleInNanos, 98 long maxConnectionAgeInNanos, 99 long maxConnectionAgeGraceInNanos, 100 boolean permitKeepAliveWithoutCalls, 101 long permitKeepAliveTimeInNanos, 102 Attributes eagAttributes) { 103 this.channel = Preconditions.checkNotNull(channel, "channel"); 104 this.channelUnused = channelUnused; 105 this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator"); 106 this.streamTracerFactories = 107 Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories"); 108 this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); 109 this.maxStreams = maxStreams; 110 this.autoFlowControl = autoFlowControl; 111 this.flowControlWindow = flowControlWindow; 112 this.maxMessageSize = maxMessageSize; 113 this.maxHeaderListSize = maxHeaderListSize; 114 this.keepAliveTimeInNanos = keepAliveTimeInNanos; 115 this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; 116 this.maxConnectionIdleInNanos = maxConnectionIdleInNanos; 117 this.maxConnectionAgeInNanos = maxConnectionAgeInNanos; 118 this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; 119 this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; 120 this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; 121 this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); 122 SocketAddress remote = channel.remoteAddress(); 123 this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null); 124 } 125 start(ServerTransportListener listener)126 public void start(ServerTransportListener listener) { 127 Preconditions.checkState(this.listener == null, "Handler already registered"); 128 this.listener = listener; 129 130 // Create the Netty handler for the pipeline. 131 grpcHandler = createHandler(listener, channelUnused); 132 133 // Notify when the channel closes. 134 final class TerminationNotifier implements ChannelFutureListener { 135 boolean done; 136 137 @Override 138 public void operationComplete(ChannelFuture future) throws Exception { 139 if (!done) { 140 done = true; 141 notifyTerminated(grpcHandler.connectionError()); 142 } 143 } 144 } 145 146 ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler); 147 ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler); 148 149 ChannelFutureListener terminationNotifier = new TerminationNotifier(); 150 channelUnused.addListener(terminationNotifier); 151 channel.closeFuture().addListener(terminationNotifier); 152 153 channel.pipeline().addLast(bufferingHandler); 154 } 155 156 @Override getScheduledExecutorService()157 public ScheduledExecutorService getScheduledExecutorService() { 158 return channel.eventLoop(); 159 } 160 161 @Override shutdown()162 public void shutdown() { 163 if (channel.isOpen()) { 164 channel.close(); 165 } 166 } 167 168 @Override shutdownNow(Status reason)169 public void shutdownNow(Status reason) { 170 if (channel.isOpen()) { 171 channel.writeAndFlush(new ForcefulCloseCommand(reason)); 172 } 173 } 174 175 @Override getLogId()176 public InternalLogId getLogId() { 177 return logId; 178 } 179 180 /** 181 * For testing purposes only. 182 */ channel()183 Channel channel() { 184 return channel; 185 } 186 187 /** 188 * Accepts a throwable and returns the appropriate logging level. Uninteresting exceptions 189 * should not clutter the log. 190 */ 191 @VisibleForTesting getLogLevel(Throwable t)192 static Level getLogLevel(Throwable t) { 193 if (t.getClass().equals(IOException.class) 194 || t.getClass().equals(SocketException.class) 195 || QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName())) { 196 return Level.FINE; 197 } 198 return Level.INFO; 199 } 200 notifyTerminated(Throwable t)201 private void notifyTerminated(Throwable t) { 202 if (t != null) { 203 connectionLog.log(getLogLevel(t), "Transport failed", t); 204 } 205 if (!terminated) { 206 terminated = true; 207 listener.transportTerminated(); 208 } 209 } 210 211 @Override getStats()212 public ListenableFuture<SocketStats> getStats() { 213 final SettableFuture<SocketStats> result = SettableFuture.create(); 214 if (channel.eventLoop().inEventLoop()) { 215 // This is necessary, otherwise we will block forever if we get the future from inside 216 // the event loop. 217 result.set(getStatsHelper(channel)); 218 return result; 219 } 220 channel.eventLoop().submit( 221 new Runnable() { 222 @Override 223 public void run() { 224 result.set(getStatsHelper(channel)); 225 } 226 }) 227 .addListener( 228 new GenericFutureListener<Future<Object>>() { 229 @Override 230 public void operationComplete(Future<Object> future) throws Exception { 231 if (!future.isSuccess()) { 232 result.setException(future.cause()); 233 } 234 } 235 }); 236 return result; 237 } 238 getStatsHelper(Channel ch)239 private SocketStats getStatsHelper(Channel ch) { 240 Preconditions.checkState(ch.eventLoop().inEventLoop()); 241 return new SocketStats( 242 transportTracer.getStats(), 243 channel.localAddress(), 244 channel.remoteAddress(), 245 Utils.getSocketOptions(ch), 246 grpcHandler == null ? null : grpcHandler.getSecurityInfo()); 247 248 } 249 250 @Override toString()251 public String toString() { 252 return MoreObjects.toStringHelper(this) 253 .add("logId", logId.getId()) 254 .add("channel", channel) 255 .toString(); 256 } 257 258 /** 259 * Creates the Netty handler to be used in the channel pipeline. 260 */ createHandler( ServerTransportListener transportListener, ChannelPromise channelUnused)261 private NettyServerHandler createHandler( 262 ServerTransportListener transportListener, ChannelPromise channelUnused) { 263 return NettyServerHandler.newHandler( 264 transportListener, 265 channelUnused, 266 streamTracerFactories, 267 transportTracer, 268 maxStreams, 269 autoFlowControl, 270 flowControlWindow, 271 maxHeaderListSize, 272 maxMessageSize, 273 keepAliveTimeInNanos, 274 keepAliveTimeoutInNanos, 275 maxConnectionIdleInNanos, 276 maxConnectionAgeInNanos, 277 maxConnectionAgeGraceInNanos, 278 permitKeepAliveWithoutCalls, 279 permitKeepAliveTimeInNanos, 280 eagAttributes); 281 } 282 } 283