• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.MoreObjects;
21 import com.google.common.base.Preconditions;
22 import com.google.common.collect.ImmutableList;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.SettableFuture;
25 import io.grpc.InternalChannelz.SocketStats;
26 import io.grpc.InternalLogId;
27 import io.grpc.ServerStreamTracer;
28 import io.grpc.Status;
29 import io.grpc.internal.ServerTransport;
30 import io.grpc.internal.ServerTransportListener;
31 import io.grpc.internal.TransportTracer;
32 import io.netty.channel.Channel;
33 import io.netty.channel.ChannelFuture;
34 import io.netty.channel.ChannelFutureListener;
35 import io.netty.channel.ChannelHandler;
36 import io.netty.channel.ChannelPromise;
37 import io.netty.util.concurrent.Future;
38 import io.netty.util.concurrent.GenericFutureListener;
39 import java.io.IOException;
40 import java.util.List;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.logging.Level;
43 import java.util.logging.Logger;
44 
45 /**
46  * The Netty-based server transport.
47  */
48 class NettyServerTransport implements ServerTransport {
49   private static final Logger log = Logger.getLogger(NettyServerTransport.class.getName());
50   // connectionLog is for connection related messages only
51   private static final Logger connectionLog = Logger.getLogger(
52       String.format("%s.connections", NettyServerTransport.class.getName()));
53   // Some exceptions are not very useful and add too much noise to the log
54   private static final ImmutableList<String> QUIET_ERRORS = ImmutableList.of(
55       "Connection reset by peer",
56       "An existing connection was forcibly closed by the remote host");
57 
58   private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
59   private final Channel channel;
60   private final ChannelPromise channelUnused;
61   private final ProtocolNegotiator protocolNegotiator;
62   private final int maxStreams;
63   // only accessed from channel event loop
64   private NettyServerHandler grpcHandler;
65   private ServerTransportListener listener;
66   private boolean terminated;
67   private final int flowControlWindow;
68   private final int maxMessageSize;
69   private final int maxHeaderListSize;
70   private final long keepAliveTimeInNanos;
71   private final long keepAliveTimeoutInNanos;
72   private final long maxConnectionIdleInNanos;
73   private final long maxConnectionAgeInNanos;
74   private final long maxConnectionAgeGraceInNanos;
75   private final boolean permitKeepAliveWithoutCalls;
76   private final long permitKeepAliveTimeInNanos;
77   private final List<ServerStreamTracer.Factory> streamTracerFactories;
78   private final TransportTracer transportTracer;
79 
NettyServerTransport( Channel channel, ChannelPromise channelUnused, ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos)80   NettyServerTransport(
81       Channel channel,
82       ChannelPromise channelUnused,
83       ProtocolNegotiator protocolNegotiator,
84       List<ServerStreamTracer.Factory> streamTracerFactories,
85       TransportTracer transportTracer,
86       int maxStreams,
87       int flowControlWindow,
88       int maxMessageSize,
89       int maxHeaderListSize,
90       long keepAliveTimeInNanos,
91       long keepAliveTimeoutInNanos,
92       long maxConnectionIdleInNanos,
93       long maxConnectionAgeInNanos,
94       long maxConnectionAgeGraceInNanos,
95       boolean permitKeepAliveWithoutCalls,
96       long permitKeepAliveTimeInNanos) {
97     this.channel = Preconditions.checkNotNull(channel, "channel");
98     this.channelUnused = channelUnused;
99     this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
100     this.streamTracerFactories =
101         Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories");
102     this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
103     this.maxStreams = maxStreams;
104     this.flowControlWindow = flowControlWindow;
105     this.maxMessageSize = maxMessageSize;
106     this.maxHeaderListSize = maxHeaderListSize;
107     this.keepAliveTimeInNanos = keepAliveTimeInNanos;
108     this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
109     this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
110     this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
111     this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
112     this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
113     this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
114   }
115 
start(ServerTransportListener listener)116   public void start(ServerTransportListener listener) {
117     Preconditions.checkState(this.listener == null, "Handler already registered");
118     this.listener = listener;
119 
120     // Create the Netty handler for the pipeline.
121     grpcHandler = createHandler(listener, channelUnused);
122     NettyHandlerSettings.setAutoWindow(grpcHandler);
123 
124     // Notify when the channel closes.
125     final class TerminationNotifier implements ChannelFutureListener {
126       boolean done;
127 
128       @Override
129       public void operationComplete(ChannelFuture future) throws Exception {
130         if (!done) {
131           done = true;
132           notifyTerminated(grpcHandler.connectionError());
133         }
134       }
135     }
136 
137     ChannelFutureListener terminationNotifier = new TerminationNotifier();
138     channelUnused.addListener(terminationNotifier);
139     channel.closeFuture().addListener(terminationNotifier);
140 
141     ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
142     channel.pipeline().addLast(negotiationHandler);
143   }
144 
145   @Override
getScheduledExecutorService()146   public ScheduledExecutorService getScheduledExecutorService() {
147     return channel.eventLoop();
148   }
149 
150   @Override
shutdown()151   public void shutdown() {
152     if (channel.isOpen()) {
153       channel.close();
154     }
155   }
156 
157   @Override
shutdownNow(Status reason)158   public void shutdownNow(Status reason) {
159     if (channel.isOpen()) {
160       channel.writeAndFlush(new ForcefulCloseCommand(reason));
161     }
162   }
163 
164   @Override
getLogId()165   public InternalLogId getLogId() {
166     return logId;
167   }
168 
169   /**
170    * For testing purposes only.
171    */
channel()172   Channel channel() {
173     return channel;
174   }
175 
176   /**
177    * Accepts a throwable and returns the appropriate logging level. Uninteresting exceptions
178    * should not clutter the log.
179    */
180   @VisibleForTesting
getLogLevel(Throwable t)181   static Level getLogLevel(Throwable t) {
182     if (t instanceof IOException && t.getMessage() != null) {
183       for (String msg : QUIET_ERRORS) {
184         if (t.getMessage().equals(msg)) {
185           return Level.FINE;
186         }
187       }
188     }
189     return Level.INFO;
190   }
191 
notifyTerminated(Throwable t)192   private void notifyTerminated(Throwable t) {
193     if (t != null) {
194       connectionLog.log(getLogLevel(t), "Transport failed", t);
195     }
196     if (!terminated) {
197       terminated = true;
198       listener.transportTerminated();
199     }
200   }
201 
202   @Override
getStats()203   public ListenableFuture<SocketStats> getStats() {
204     final SettableFuture<SocketStats> result = SettableFuture.create();
205     if (channel.eventLoop().inEventLoop()) {
206       // This is necessary, otherwise we will block forever if we get the future from inside
207       // the event loop.
208       result.set(getStatsHelper(channel));
209       return result;
210     }
211     channel.eventLoop().submit(
212         new Runnable() {
213           @Override
214           public void run() {
215             result.set(getStatsHelper(channel));
216           }
217         })
218         .addListener(
219             new GenericFutureListener<Future<Object>>() {
220               @Override
221               public void operationComplete(Future<Object> future) throws Exception {
222                 if (!future.isSuccess()) {
223                   result.setException(future.cause());
224                 }
225               }
226             });
227     return result;
228   }
229 
getStatsHelper(Channel ch)230   private SocketStats getStatsHelper(Channel ch) {
231     Preconditions.checkState(ch.eventLoop().inEventLoop());
232     return new SocketStats(
233         transportTracer.getStats(),
234         channel.localAddress(),
235         channel.remoteAddress(),
236         Utils.getSocketOptions(ch),
237         grpcHandler == null ? null : grpcHandler.getSecurityInfo());
238 
239   }
240 
241   @Override
toString()242   public String toString() {
243     return MoreObjects.toStringHelper(this)
244         .add("logId", logId.getId())
245         .add("channel", channel)
246         .toString();
247   }
248 
249   /**
250    * Creates the Netty handler to be used in the channel pipeline.
251    */
createHandler( ServerTransportListener transportListener, ChannelPromise channelUnused)252   private NettyServerHandler createHandler(
253       ServerTransportListener transportListener, ChannelPromise channelUnused) {
254     return NettyServerHandler.newHandler(
255         transportListener,
256         channelUnused,
257         streamTracerFactories,
258         transportTracer,
259         maxStreams,
260         flowControlWindow,
261         maxHeaderListSize,
262         maxMessageSize,
263         keepAliveTimeInNanos,
264         keepAliveTimeoutInNanos,
265         maxConnectionIdleInNanos,
266         maxConnectionAgeInNanos,
267         maxConnectionAgeGraceInNanos,
268         permitKeepAliveWithoutCalls,
269         permitKeepAliveTimeInNanos);
270   }
271 }
272