• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.MoreObjects;
21 import com.google.common.base.Preconditions;
22 import com.google.common.collect.ImmutableList;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.SettableFuture;
25 import io.grpc.Attributes;
26 import io.grpc.InternalChannelz.SocketStats;
27 import io.grpc.InternalLogId;
28 import io.grpc.ServerStreamTracer;
29 import io.grpc.Status;
30 import io.grpc.internal.ServerTransport;
31 import io.grpc.internal.ServerTransportListener;
32 import io.grpc.internal.TransportTracer;
33 import io.netty.channel.Channel;
34 import io.netty.channel.ChannelFuture;
35 import io.netty.channel.ChannelFutureListener;
36 import io.netty.channel.ChannelHandler;
37 import io.netty.channel.ChannelPromise;
38 import io.netty.util.concurrent.Future;
39 import io.netty.util.concurrent.GenericFutureListener;
40 import java.io.IOException;
41 import java.net.SocketAddress;
42 import java.net.SocketException;
43 import java.util.List;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.logging.Level;
46 import java.util.logging.Logger;
47 
48 /**
49  * The Netty-based server transport.
50  */
51 class NettyServerTransport implements ServerTransport {
52   // connectionLog is for connection related messages only
53   private static final Logger connectionLog = Logger.getLogger(
54       String.format("%s.connections", NettyServerTransport.class.getName()));
55 
56   // Some exceptions are not very useful and add too much noise to the log
57   private static final ImmutableList<String> QUIET_EXCEPTIONS = ImmutableList.of(
58       "NativeIoException" /* Netty exceptions */);
59 
60   private final InternalLogId logId;
61   private final Channel channel;
62   private final ChannelPromise channelUnused;
63   private final ProtocolNegotiator protocolNegotiator;
64   private final int maxStreams;
65   // only accessed from channel event loop
66   private NettyServerHandler grpcHandler;
67   private ServerTransportListener listener;
68   private boolean terminated;
69   private final boolean autoFlowControl;
70   private final int flowControlWindow;
71   private final int maxMessageSize;
72   private final int maxHeaderListSize;
73   private final long keepAliveTimeInNanos;
74   private final long keepAliveTimeoutInNanos;
75   private final long maxConnectionIdleInNanos;
76   private final long maxConnectionAgeInNanos;
77   private final long maxConnectionAgeGraceInNanos;
78   private final boolean permitKeepAliveWithoutCalls;
79   private final long permitKeepAliveTimeInNanos;
80   private final Attributes eagAttributes;
81   private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
82   private final TransportTracer transportTracer;
83 
NettyServerTransport( Channel channel, ChannelPromise channelUnused, ProtocolNegotiator protocolNegotiator, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, Attributes eagAttributes)84   NettyServerTransport(
85       Channel channel,
86       ChannelPromise channelUnused,
87       ProtocolNegotiator protocolNegotiator,
88       List<? extends ServerStreamTracer.Factory> streamTracerFactories,
89       TransportTracer transportTracer,
90       int maxStreams,
91       boolean autoFlowControl,
92       int flowControlWindow,
93       int maxMessageSize,
94       int maxHeaderListSize,
95       long keepAliveTimeInNanos,
96       long keepAliveTimeoutInNanos,
97       long maxConnectionIdleInNanos,
98       long maxConnectionAgeInNanos,
99       long maxConnectionAgeGraceInNanos,
100       boolean permitKeepAliveWithoutCalls,
101       long permitKeepAliveTimeInNanos,
102       Attributes eagAttributes) {
103     this.channel = Preconditions.checkNotNull(channel, "channel");
104     this.channelUnused = channelUnused;
105     this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
106     this.streamTracerFactories =
107         Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories");
108     this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
109     this.maxStreams = maxStreams;
110     this.autoFlowControl = autoFlowControl;
111     this.flowControlWindow = flowControlWindow;
112     this.maxMessageSize = maxMessageSize;
113     this.maxHeaderListSize = maxHeaderListSize;
114     this.keepAliveTimeInNanos = keepAliveTimeInNanos;
115     this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
116     this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
117     this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
118     this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
119     this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
120     this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
121     this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
122     SocketAddress remote = channel.remoteAddress();
123     this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null);
124   }
125 
start(ServerTransportListener listener)126   public void start(ServerTransportListener listener) {
127     Preconditions.checkState(this.listener == null, "Handler already registered");
128     this.listener = listener;
129 
130     // Create the Netty handler for the pipeline.
131     grpcHandler = createHandler(listener, channelUnused);
132 
133     // Notify when the channel closes.
134     final class TerminationNotifier implements ChannelFutureListener {
135       boolean done;
136 
137       @Override
138       public void operationComplete(ChannelFuture future) throws Exception {
139         if (!done) {
140           done = true;
141           notifyTerminated(grpcHandler.connectionError());
142         }
143       }
144     }
145 
146     ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
147     ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
148 
149     ChannelFutureListener terminationNotifier = new TerminationNotifier();
150     channelUnused.addListener(terminationNotifier);
151     channel.closeFuture().addListener(terminationNotifier);
152 
153     channel.pipeline().addLast(bufferingHandler);
154   }
155 
156   @Override
getScheduledExecutorService()157   public ScheduledExecutorService getScheduledExecutorService() {
158     return channel.eventLoop();
159   }
160 
161   @Override
shutdown()162   public void shutdown() {
163     if (channel.isOpen()) {
164       channel.close();
165     }
166   }
167 
168   @Override
shutdownNow(Status reason)169   public void shutdownNow(Status reason) {
170     if (channel.isOpen()) {
171       channel.writeAndFlush(new ForcefulCloseCommand(reason));
172     }
173   }
174 
175   @Override
getLogId()176   public InternalLogId getLogId() {
177     return logId;
178   }
179 
180   /**
181    * For testing purposes only.
182    */
channel()183   Channel channel() {
184     return channel;
185   }
186 
187   /**
188    * Accepts a throwable and returns the appropriate logging level. Uninteresting exceptions
189    * should not clutter the log.
190    */
191   @VisibleForTesting
getLogLevel(Throwable t)192   static Level getLogLevel(Throwable t) {
193     if (t.getClass().equals(IOException.class)
194         || t.getClass().equals(SocketException.class)
195         || QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName())) {
196       return Level.FINE;
197     }
198     return Level.INFO;
199   }
200 
notifyTerminated(Throwable t)201   private void notifyTerminated(Throwable t) {
202     if (t != null) {
203       connectionLog.log(getLogLevel(t), "Transport failed", t);
204     }
205     if (!terminated) {
206       terminated = true;
207       listener.transportTerminated();
208     }
209   }
210 
211   @Override
getStats()212   public ListenableFuture<SocketStats> getStats() {
213     final SettableFuture<SocketStats> result = SettableFuture.create();
214     if (channel.eventLoop().inEventLoop()) {
215       // This is necessary, otherwise we will block forever if we get the future from inside
216       // the event loop.
217       result.set(getStatsHelper(channel));
218       return result;
219     }
220     channel.eventLoop().submit(
221         new Runnable() {
222           @Override
223           public void run() {
224             result.set(getStatsHelper(channel));
225           }
226         })
227         .addListener(
228             new GenericFutureListener<Future<Object>>() {
229               @Override
230               public void operationComplete(Future<Object> future) throws Exception {
231                 if (!future.isSuccess()) {
232                   result.setException(future.cause());
233                 }
234               }
235             });
236     return result;
237   }
238 
getStatsHelper(Channel ch)239   private SocketStats getStatsHelper(Channel ch) {
240     Preconditions.checkState(ch.eventLoop().inEventLoop());
241     return new SocketStats(
242         transportTracer.getStats(),
243         channel.localAddress(),
244         channel.remoteAddress(),
245         Utils.getSocketOptions(ch),
246         grpcHandler == null ? null : grpcHandler.getSecurityInfo());
247 
248   }
249 
250   @Override
toString()251   public String toString() {
252     return MoreObjects.toStringHelper(this)
253         .add("logId", logId.getId())
254         .add("channel", channel)
255         .toString();
256   }
257 
258   /**
259    * Creates the Netty handler to be used in the channel pipeline.
260    */
createHandler( ServerTransportListener transportListener, ChannelPromise channelUnused)261   private NettyServerHandler createHandler(
262       ServerTransportListener transportListener, ChannelPromise channelUnused) {
263     return NettyServerHandler.newHandler(
264         transportListener,
265         channelUnused,
266         streamTracerFactories,
267         transportTracer,
268         maxStreams,
269         autoFlowControl,
270         flowControlWindow,
271         maxHeaderListSize,
272         maxMessageSize,
273         keepAliveTimeInNanos,
274         keepAliveTimeoutInNanos,
275         maxConnectionIdleInNanos,
276         maxConnectionAgeInNanos,
277         maxConnectionAgeGraceInNanos,
278         permitKeepAliveWithoutCalls,
279         permitKeepAliveTimeInNanos,
280         eagAttributes);
281   }
282 }
283