• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
23 import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
24 
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Ticker;
27 import com.google.errorprone.annotations.CanIgnoreReturnValue;
28 import com.google.errorprone.annotations.CheckReturnValue;
29 import com.google.errorprone.annotations.InlineMe;
30 import io.grpc.Attributes;
31 import io.grpc.CallCredentials;
32 import io.grpc.ChannelCredentials;
33 import io.grpc.ChannelLogger;
34 import io.grpc.EquivalentAddressGroup;
35 import io.grpc.ExperimentalApi;
36 import io.grpc.HttpConnectProxiedSocketAddress;
37 import io.grpc.Internal;
38 import io.grpc.ManagedChannelBuilder;
39 import io.grpc.internal.AbstractManagedChannelImplBuilder;
40 import io.grpc.internal.AtomicBackoff;
41 import io.grpc.internal.ClientTransportFactory;
42 import io.grpc.internal.ConnectionClientTransport;
43 import io.grpc.internal.FixedObjectPool;
44 import io.grpc.internal.GrpcUtil;
45 import io.grpc.internal.KeepAliveManager;
46 import io.grpc.internal.ManagedChannelImplBuilder;
47 import io.grpc.internal.ManagedChannelImplBuilder.ChannelBuilderDefaultPortProvider;
48 import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
49 import io.grpc.internal.ObjectPool;
50 import io.grpc.internal.SharedResourcePool;
51 import io.grpc.internal.TransportTracer;
52 import io.grpc.netty.ProtocolNegotiators.FromChannelCredentialsResult;
53 import io.netty.channel.Channel;
54 import io.netty.channel.ChannelFactory;
55 import io.netty.channel.ChannelOption;
56 import io.netty.channel.EventLoopGroup;
57 import io.netty.channel.ReflectiveChannelFactory;
58 import io.netty.channel.socket.nio.NioSocketChannel;
59 import io.netty.handler.ssl.SslContext;
60 import java.net.InetSocketAddress;
61 import java.net.SocketAddress;
62 import java.util.HashMap;
63 import java.util.Map;
64 import java.util.concurrent.Executor;
65 import java.util.concurrent.ScheduledExecutorService;
66 import java.util.concurrent.TimeUnit;
67 import javax.annotation.Nullable;
68 import javax.net.ssl.SSLException;
69 
70 /**
71  * A builder to help simplify construction of channels using the Netty transport.
72  */
73 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784")
74 @CheckReturnValue
75 public final class NettyChannelBuilder extends
76     AbstractManagedChannelImplBuilder<NettyChannelBuilder> {
77 
78   // 1MiB.
79   public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1024 * 1024;
80   private static final boolean DEFAULT_AUTO_FLOW_CONTROL;
81 
82   private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
83 
84   private static final ChannelFactory<? extends Channel> DEFAULT_CHANNEL_FACTORY =
85       new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
86   private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
87       SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
88 
89   static {
90     String autoFlowControl = System.getenv("GRPC_EXPERIMENTAL_AUTOFLOWCONTROL");
91     if (autoFlowControl == null) {
92       autoFlowControl = "true";
93     }
94     DEFAULT_AUTO_FLOW_CONTROL = Boolean.parseBoolean(autoFlowControl);
95   }
96 
97   private final ManagedChannelImplBuilder managedChannelImplBuilder;
98   private TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
99   private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>();
100   private ChannelFactory<? extends Channel> channelFactory = DEFAULT_CHANNEL_FACTORY;
101   private ObjectPool<? extends EventLoopGroup> eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL;
102   private boolean autoFlowControl = DEFAULT_AUTO_FLOW_CONTROL;
103   private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
104   private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
105   private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED;
106   private long keepAliveTimeoutNanos = DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
107   private boolean keepAliveWithoutCalls;
108   private ProtocolNegotiator.ClientFactory protocolNegotiatorFactory
109       = new DefaultProtocolNegotiator();
110   private final boolean freezeProtocolNegotiatorFactory;
111   private LocalSocketPicker localSocketPicker;
112 
113   /**
114    * If true, indicates that the transport may use the GET method for RPCs, and may include the
115    * request body in the query params.
116    */
117   private final boolean useGetForSafeMethods = false;
118 
119   /**
120    * Creates a new builder with the given server address. This factory method is primarily intended
121    * for using Netty Channel types other than SocketChannel. {@link #forAddress(String, int)} should
122    * generally be preferred over this method, since that API permits delaying DNS lookups and
123    * noticing changes to DNS. If an unresolved InetSocketAddress is passed in, then it will remain
124    * unresolved.
125    */
forAddress(SocketAddress serverAddress)126   public static NettyChannelBuilder forAddress(SocketAddress serverAddress) {
127     return new NettyChannelBuilder(serverAddress);
128   }
129 
130   /**
131    * Creates a new builder with the given server address. This factory method is primarily intended
132    * for using Netty Channel types other than SocketChannel.
133    * {@link #forAddress(String, int, ChannelCredentials)} should generally be preferred over this
134    * method, since that API permits delaying DNS lookups and noticing changes to DNS. If an
135    * unresolved InetSocketAddress is passed in, then it will remain unresolved.
136    */
forAddress(SocketAddress serverAddress, ChannelCredentials creds)137   public static NettyChannelBuilder forAddress(SocketAddress serverAddress,
138       ChannelCredentials creds) {
139     FromChannelCredentialsResult result = ProtocolNegotiators.from(creds);
140     if (result.error != null) {
141       throw new IllegalArgumentException(result.error);
142     }
143     return new NettyChannelBuilder(serverAddress, creds, result.callCredentials, result.negotiator);
144   }
145 
146   /**
147    * Creates a new builder with the given host and port.
148    */
forAddress(String host, int port)149   public static NettyChannelBuilder forAddress(String host, int port) {
150     return forTarget(GrpcUtil.authorityFromHostAndPort(host, port));
151   }
152 
153   /**
154    * Creates a new builder with the given host and port.
155    */
forAddress(String host, int port, ChannelCredentials creds)156   public static NettyChannelBuilder forAddress(String host, int port, ChannelCredentials creds) {
157     return forTarget(GrpcUtil.authorityFromHostAndPort(host, port), creds);
158   }
159 
160   /**
161    * Creates a new builder with the given target string that will be resolved by
162    * {@link io.grpc.NameResolver}.
163    */
forTarget(String target)164   public static NettyChannelBuilder forTarget(String target) {
165     return new NettyChannelBuilder(target);
166   }
167 
168   /**
169    * Creates a new builder with the given target string that will be resolved by
170    * {@link io.grpc.NameResolver}.
171    */
forTarget(String target, ChannelCredentials creds)172   public static NettyChannelBuilder forTarget(String target, ChannelCredentials creds) {
173     FromChannelCredentialsResult result = ProtocolNegotiators.from(creds);
174     if (result.error != null) {
175       throw new IllegalArgumentException(result.error);
176     }
177     return new NettyChannelBuilder(target, creds, result.callCredentials, result.negotiator);
178   }
179 
180   private final class NettyChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder {
181     @Override
buildClientTransportFactory()182     public ClientTransportFactory buildClientTransportFactory() {
183       return buildTransportFactory();
184     }
185   }
186 
187   private final class NettyChannelDefaultPortProvider implements ChannelBuilderDefaultPortProvider {
188     @Override
getDefaultPort()189     public int getDefaultPort() {
190       return protocolNegotiatorFactory.getDefaultPort();
191     }
192   }
193 
NettyChannelBuilder(String target)194   NettyChannelBuilder(String target) {
195     managedChannelImplBuilder = new ManagedChannelImplBuilder(target,
196         new NettyChannelTransportFactoryBuilder(),
197         new NettyChannelDefaultPortProvider());
198     this.freezeProtocolNegotiatorFactory = false;
199   }
200 
NettyChannelBuilder( String target, ChannelCredentials channelCreds, CallCredentials callCreds, ProtocolNegotiator.ClientFactory negotiator)201   NettyChannelBuilder(
202       String target, ChannelCredentials channelCreds, CallCredentials callCreds,
203       ProtocolNegotiator.ClientFactory negotiator) {
204     managedChannelImplBuilder = new ManagedChannelImplBuilder(
205         target, channelCreds, callCreds,
206         new NettyChannelTransportFactoryBuilder(),
207         new NettyChannelDefaultPortProvider());
208     this.protocolNegotiatorFactory = checkNotNull(negotiator, "negotiator");
209     this.freezeProtocolNegotiatorFactory = true;
210   }
211 
NettyChannelBuilder(SocketAddress address)212   NettyChannelBuilder(SocketAddress address) {
213     managedChannelImplBuilder = new ManagedChannelImplBuilder(address,
214         getAuthorityFromAddress(address),
215         new NettyChannelTransportFactoryBuilder(),
216         new NettyChannelDefaultPortProvider());
217     this.freezeProtocolNegotiatorFactory = false;
218   }
219 
NettyChannelBuilder( SocketAddress address, ChannelCredentials channelCreds, CallCredentials callCreds, ProtocolNegotiator.ClientFactory negotiator)220   NettyChannelBuilder(
221       SocketAddress address, ChannelCredentials channelCreds, CallCredentials callCreds,
222       ProtocolNegotiator.ClientFactory negotiator) {
223     managedChannelImplBuilder = new ManagedChannelImplBuilder(address,
224         getAuthorityFromAddress(address),
225         channelCreds, callCreds,
226         new NettyChannelTransportFactoryBuilder(),
227         new NettyChannelDefaultPortProvider());
228     this.protocolNegotiatorFactory = checkNotNull(negotiator, "negotiator");
229     this.freezeProtocolNegotiatorFactory = true;
230   }
231 
232   @Internal
233   @Override
delegate()234   protected ManagedChannelBuilder<?> delegate() {
235     return managedChannelImplBuilder;
236   }
237 
getAuthorityFromAddress(SocketAddress address)238   private static String getAuthorityFromAddress(SocketAddress address) {
239     if (address instanceof InetSocketAddress) {
240       InetSocketAddress inetAddress = (InetSocketAddress) address;
241       return GrpcUtil.authorityFromHostAndPort(inetAddress.getHostString(), inetAddress.getPort());
242     } else {
243       return address.toString();
244     }
245   }
246 
247   /**
248    * Specifies the channel type to use, by default we use {@code EpollSocketChannel} if available,
249    * otherwise using {@link NioSocketChannel}.
250    *
251    * <p>You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
252    * {@link Channel} implementation has no no-args constructor.
253    *
254    * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
255    * when the channel is built, the builder will use the default one which is static.
256    *
257    * <p>You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example,
258    * {@link NioSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup}, otherwise
259    * your application won't start.
260    */
261   @CanIgnoreReturnValue
channelType(Class<? extends Channel> channelType)262   public NettyChannelBuilder channelType(Class<? extends Channel> channelType) {
263     checkNotNull(channelType, "channelType");
264     return channelFactory(new ReflectiveChannelFactory<>(channelType));
265   }
266 
267   /**
268    * Specifies the {@link ChannelFactory} to create {@link Channel} instances. This method is
269    * usually only used if the specific {@code Channel} requires complex logic which requires
270    * additional information to create the {@code Channel}. Otherwise, recommend to use {@link
271    * #channelType(Class)}.
272    *
273    * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory
274    * when the channel is built, the builder will use the default one which is static.
275    *
276    * <p>You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example,
277    * {@link NioSocketChannel} based {@link ChannelFactory} must use {@link
278    * io.netty.channel.nio.NioEventLoopGroup}, otherwise your application won't start.
279    */
280   @CanIgnoreReturnValue
channelFactory(ChannelFactory<? extends Channel> channelFactory)281   public NettyChannelBuilder channelFactory(ChannelFactory<? extends Channel> channelFactory) {
282     this.channelFactory = checkNotNull(channelFactory, "channelFactory");
283     return this;
284   }
285 
286   /**
287    * Specifies a channel option. As the underlying channel as well as network implementation may
288    * ignore this value applications should consider it a hint.
289    */
290   @CanIgnoreReturnValue
withOption(ChannelOption<T> option, T value)291   public <T> NettyChannelBuilder withOption(ChannelOption<T> option, T value) {
292     channelOptions.put(option, value);
293     return this;
294   }
295 
296   /**
297    * Sets the negotiation type for the HTTP/2 connection.
298    *
299    * <p>Default: <code>TLS</code>
300    */
301   @CanIgnoreReturnValue
negotiationType(NegotiationType type)302   public NettyChannelBuilder negotiationType(NegotiationType type) {
303     checkState(!freezeProtocolNegotiatorFactory,
304                "Cannot change security when using ChannelCredentials");
305     if (!(protocolNegotiatorFactory instanceof DefaultProtocolNegotiator)) {
306       // Do nothing for compatibility
307       return this;
308     }
309     ((DefaultProtocolNegotiator) protocolNegotiatorFactory).negotiationType = type;
310     return this;
311   }
312 
313   /**
314    * Provides an EventGroupLoop to be used by the netty transport.
315    *
316    * <p>It's an optional parameter. If the user has not provided an EventGroupLoop when the channel
317    * is built, the builder will use the default one which is static.
318    *
319    * <p>You must also provide corresponding {@link #channelType(Class)} or {@link
320    * #channelFactory(ChannelFactory)} corresponding to the given {@code EventLoopGroup}. For
321    * example, {@link io.netty.channel.nio.NioEventLoopGroup} requires {@link NioSocketChannel}
322    *
323    * <p>The channel won't take ownership of the given EventLoopGroup. It's caller's responsibility
324    * to shut it down when it's desired.
325    */
326   @CanIgnoreReturnValue
eventLoopGroup(@ullable EventLoopGroup eventLoopGroup)327   public NettyChannelBuilder eventLoopGroup(@Nullable EventLoopGroup eventLoopGroup) {
328     if (eventLoopGroup != null) {
329       return eventLoopGroupPool(new FixedObjectPool<>(eventLoopGroup));
330     }
331     return eventLoopGroupPool(DEFAULT_EVENT_LOOP_GROUP_POOL);
332   }
333 
334   @CanIgnoreReturnValue
eventLoopGroupPool(ObjectPool<? extends EventLoopGroup> eventLoopGroupPool)335   NettyChannelBuilder eventLoopGroupPool(ObjectPool<? extends EventLoopGroup> eventLoopGroupPool) {
336     this.eventLoopGroupPool = checkNotNull(eventLoopGroupPool, "eventLoopGroupPool");
337     return this;
338   }
339 
340   /**
341    * SSL/TLS context to use instead of the system default. It must have been configured with {@link
342    * GrpcSslContexts}, but options could have been overridden.
343    */
344   @CanIgnoreReturnValue
sslContext(SslContext sslContext)345   public NettyChannelBuilder sslContext(SslContext sslContext) {
346     checkState(!freezeProtocolNegotiatorFactory,
347                "Cannot change security when using ChannelCredentials");
348     if (sslContext != null) {
349       checkArgument(sslContext.isClient(),
350           "Server SSL context can not be used for client channel");
351       GrpcSslContexts.ensureAlpnAndH2Enabled(sslContext.applicationProtocolNegotiator());
352     }
353     if (!(protocolNegotiatorFactory instanceof DefaultProtocolNegotiator)) {
354       // Do nothing for compatibility
355       return this;
356     }
357     ((DefaultProtocolNegotiator) protocolNegotiatorFactory).sslContext = sslContext;
358     return this;
359   }
360 
361   /**
362    * Sets the initial flow control window in bytes. Setting initial flow control window enables auto
363    * flow control tuning using bandwidth-delay product algorithm. To disable auto flow control
364    * tuning, use {@link #flowControlWindow(int)}. By default, auto flow control is enabled with
365    * initial flow control window size of {@link #DEFAULT_FLOW_CONTROL_WINDOW}.
366    */
367   @CanIgnoreReturnValue
initialFlowControlWindow(int initialFlowControlWindow)368   public NettyChannelBuilder initialFlowControlWindow(int initialFlowControlWindow) {
369     checkArgument(initialFlowControlWindow > 0, "initialFlowControlWindow must be positive");
370     this.flowControlWindow = initialFlowControlWindow;
371     this.autoFlowControl = true;
372     return this;
373   }
374 
375   /**
376    * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control
377    * tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not
378    * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}) with auto flow control
379    * tuning.
380    */
381   @CanIgnoreReturnValue
flowControlWindow(int flowControlWindow)382   public NettyChannelBuilder flowControlWindow(int flowControlWindow) {
383     checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
384     this.flowControlWindow = flowControlWindow;
385     this.autoFlowControl = false;
386     return this;
387   }
388 
389   /**
390    * Sets the maximum size of header list allowed to be received. This is cumulative size of the
391    * headers with some overhead, as defined for
392    * <a href="http://httpwg.org/specs/rfc7540.html#rfc.section.6.5.2">
393    * HTTP/2's SETTINGS_MAX_HEADER_LIST_SIZE</a>. The default is 8 KiB.
394    *
395    * @deprecated Use {@link #maxInboundMetadataSize} instead
396    */
397   @CanIgnoreReturnValue
398   @Deprecated
399   @InlineMe(replacement = "this.maxInboundMetadataSize(maxHeaderListSize)")
maxHeaderListSize(int maxHeaderListSize)400   public NettyChannelBuilder maxHeaderListSize(int maxHeaderListSize) {
401     return maxInboundMetadataSize(maxHeaderListSize);
402   }
403 
404   /**
405    * Sets the maximum size of metadata allowed to be received. This is cumulative size of the
406    * entries with some overhead, as defined for
407    * <a href="http://httpwg.org/specs/rfc7540.html#rfc.section.6.5.2">
408    * HTTP/2's SETTINGS_MAX_HEADER_LIST_SIZE</a>. The default is 8 KiB.
409    *
410    * @param bytes the maximum size of received metadata
411    * @return this
412    * @throws IllegalArgumentException if bytes is non-positive
413    * @since 1.17.0
414    */
415   @CanIgnoreReturnValue
416   @Override
maxInboundMetadataSize(int bytes)417   public NettyChannelBuilder maxInboundMetadataSize(int bytes) {
418     checkArgument(bytes > 0, "maxInboundMetadataSize must be > 0");
419     this.maxHeaderListSize = bytes;
420     return this;
421   }
422 
423   /**
424    * Equivalent to using {@link #negotiationType(NegotiationType)} with {@code PLAINTEXT}.
425    */
426   @CanIgnoreReturnValue
427   @Override
usePlaintext()428   public NettyChannelBuilder usePlaintext() {
429     negotiationType(NegotiationType.PLAINTEXT);
430     return this;
431   }
432 
433   /**
434    * Equivalent to using {@link #negotiationType(NegotiationType)} with {@code TLS}.
435    */
436   @CanIgnoreReturnValue
437   @Override
useTransportSecurity()438   public NettyChannelBuilder useTransportSecurity() {
439     negotiationType(NegotiationType.TLS);
440     return this;
441   }
442 
443   /**
444    * {@inheritDoc}
445    *
446    * @since 1.3.0
447    */
448   @CanIgnoreReturnValue
449   @Override
keepAliveTime(long keepAliveTime, TimeUnit timeUnit)450   public NettyChannelBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
451     checkArgument(keepAliveTime > 0L, "keepalive time must be positive");
452     keepAliveTimeNanos = timeUnit.toNanos(keepAliveTime);
453     keepAliveTimeNanos = KeepAliveManager.clampKeepAliveTimeInNanos(keepAliveTimeNanos);
454     if (keepAliveTimeNanos >= AS_LARGE_AS_INFINITE) {
455       // Bump keepalive time to infinite. This disables keepalive.
456       keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED;
457     }
458     return this;
459   }
460 
461   /**
462    * {@inheritDoc}
463    *
464    * @since 1.3.0
465    */
466   @CanIgnoreReturnValue
467   @Override
keepAliveTimeout(long keepAliveTimeout, TimeUnit timeUnit)468   public NettyChannelBuilder keepAliveTimeout(long keepAliveTimeout, TimeUnit timeUnit) {
469     checkArgument(keepAliveTimeout > 0L, "keepalive timeout must be positive");
470     keepAliveTimeoutNanos = timeUnit.toNanos(keepAliveTimeout);
471     keepAliveTimeoutNanos = KeepAliveManager.clampKeepAliveTimeoutInNanos(keepAliveTimeoutNanos);
472     return this;
473   }
474 
475   /**
476    * {@inheritDoc}
477    *
478    * @since 1.3.0
479    */
480   @CanIgnoreReturnValue
481   @Override
keepAliveWithoutCalls(boolean enable)482   public NettyChannelBuilder keepAliveWithoutCalls(boolean enable) {
483     keepAliveWithoutCalls = enable;
484     return this;
485   }
486 
487 
488   /**
489    * If non-{@code null}, attempts to create connections bound to a local port.
490    */
491   @CanIgnoreReturnValue
localSocketPicker(@ullable LocalSocketPicker localSocketPicker)492   public NettyChannelBuilder localSocketPicker(@Nullable LocalSocketPicker localSocketPicker) {
493     this.localSocketPicker = localSocketPicker;
494     return this;
495   }
496 
497   /**
498    * This class is meant to be overriden with a custom implementation of
499    * {@link #createSocketAddress}.  The default implementation is a no-op.
500    *
501    * @since 1.16.0
502    */
503   @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4917")
504   public static class LocalSocketPicker {
505 
506     /**
507      * Called by gRPC to pick local socket to bind to.  This may be called multiple times.
508      * Subclasses are expected to override this method.
509      *
510      * @param remoteAddress the remote address to connect to.
511      * @param attrs the Attributes present on the {@link io.grpc.EquivalentAddressGroup} associated
512      *        with the address.
513      * @return a {@link SocketAddress} suitable for binding, or else {@code null}.
514      * @since 1.16.0
515      */
516     @Nullable
createSocketAddress( SocketAddress remoteAddress, @EquivalentAddressGroup.Attr Attributes attrs)517     public SocketAddress createSocketAddress(
518         SocketAddress remoteAddress, @EquivalentAddressGroup.Attr Attributes attrs) {
519       return null;
520     }
521   }
522 
523   /**
524    * Sets the maximum message size allowed for a single gRPC frame. If an inbound messages larger
525    * than this limit is received it will not be processed and the RPC will fail with
526    * RESOURCE_EXHAUSTED.
527    */
528   @CanIgnoreReturnValue
529   @Override
maxInboundMessageSize(int max)530   public NettyChannelBuilder maxInboundMessageSize(int max) {
531     checkArgument(max >= 0, "negative max");
532     maxInboundMessageSize = max;
533     return this;
534   }
535 
buildTransportFactory()536   ClientTransportFactory buildTransportFactory() {
537     assertEventLoopAndChannelType();
538 
539     ProtocolNegotiator negotiator = protocolNegotiatorFactory.newNegotiator();
540     return new NettyTransportFactory(
541         negotiator, channelFactory, channelOptions,
542         eventLoopGroupPool, autoFlowControl, flowControlWindow, maxInboundMessageSize,
543         maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
544         transportTracerFactory, localSocketPicker, useGetForSafeMethods);
545   }
546 
547   @VisibleForTesting
assertEventLoopAndChannelType()548   void assertEventLoopAndChannelType() {
549     boolean bothProvided = channelFactory != DEFAULT_CHANNEL_FACTORY
550         && eventLoopGroupPool != DEFAULT_EVENT_LOOP_GROUP_POOL;
551     boolean nonProvided = channelFactory == DEFAULT_CHANNEL_FACTORY
552         && eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL;
553     checkState(
554         bothProvided || nonProvided,
555         "Both EventLoopGroup and ChannelType should be provided or neither should be");
556   }
557 
getDefaultPort()558   int getDefaultPort() {
559     return protocolNegotiatorFactory.getDefaultPort();
560   }
561 
562   @VisibleForTesting
createProtocolNegotiatorByType( NegotiationType negotiationType, SslContext sslContext, ObjectPool<? extends Executor> executorPool)563   static ProtocolNegotiator createProtocolNegotiatorByType(
564       NegotiationType negotiationType,
565       SslContext sslContext,
566       ObjectPool<? extends Executor> executorPool) {
567     switch (negotiationType) {
568       case PLAINTEXT:
569         return ProtocolNegotiators.plaintext();
570       case PLAINTEXT_UPGRADE:
571         return ProtocolNegotiators.plaintextUpgrade();
572       case TLS:
573         return ProtocolNegotiators.tls(sslContext, executorPool);
574       default:
575         throw new IllegalArgumentException("Unsupported negotiationType: " + negotiationType);
576     }
577   }
578 
579   @CanIgnoreReturnValue
disableCheckAuthority()580   NettyChannelBuilder disableCheckAuthority() {
581     this.managedChannelImplBuilder.disableCheckAuthority();
582     return this;
583   }
584 
585   @CanIgnoreReturnValue
enableCheckAuthority()586   NettyChannelBuilder enableCheckAuthority() {
587     this.managedChannelImplBuilder.enableCheckAuthority();
588     return this;
589   }
590 
protocolNegotiatorFactory(ProtocolNegotiator.ClientFactory protocolNegotiatorFactory)591   void protocolNegotiatorFactory(ProtocolNegotiator.ClientFactory protocolNegotiatorFactory) {
592     checkState(!freezeProtocolNegotiatorFactory,
593                "Cannot change security when using ChannelCredentials");
594     this.protocolNegotiatorFactory
595         = checkNotNull(protocolNegotiatorFactory, "protocolNegotiatorFactory");
596   }
597 
setTracingEnabled(boolean value)598   void setTracingEnabled(boolean value) {
599     this.managedChannelImplBuilder.setTracingEnabled(value);
600   }
601 
setStatsEnabled(boolean value)602   void setStatsEnabled(boolean value) {
603     this.managedChannelImplBuilder.setStatsEnabled(value);
604   }
605 
setStatsRecordStartedRpcs(boolean value)606   void setStatsRecordStartedRpcs(boolean value) {
607     this.managedChannelImplBuilder.setStatsRecordStartedRpcs(value);
608   }
609 
setStatsRecordFinishedRpcs(boolean value)610   void setStatsRecordFinishedRpcs(boolean value) {
611     this.managedChannelImplBuilder.setStatsRecordFinishedRpcs(value);
612   }
613 
setStatsRecordRealTimeMetrics(boolean value)614   void setStatsRecordRealTimeMetrics(boolean value) {
615     this.managedChannelImplBuilder.setStatsRecordRealTimeMetrics(value);
616   }
617 
setStatsRecordRetryMetrics(boolean value)618   void setStatsRecordRetryMetrics(boolean value) {
619     this.managedChannelImplBuilder.setStatsRecordRetryMetrics(value);
620   }
621 
622   @CanIgnoreReturnValue
623   @VisibleForTesting
setTransportTracerFactory(TransportTracer.Factory transportTracerFactory)624   NettyChannelBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) {
625     this.transportTracerFactory = transportTracerFactory;
626     return this;
627   }
628 
629   private final class DefaultProtocolNegotiator implements ProtocolNegotiator.ClientFactory {
630     private NegotiationType negotiationType = NegotiationType.TLS;
631     private SslContext sslContext;
632 
633     @Override
newNegotiator()634     public ProtocolNegotiator newNegotiator() {
635       SslContext localSslContext = sslContext;
636       if (negotiationType == NegotiationType.TLS && localSslContext == null) {
637         try {
638           localSslContext = GrpcSslContexts.forClient().build();
639         } catch (SSLException ex) {
640           throw new RuntimeException(ex);
641         }
642       }
643       return createProtocolNegotiatorByType(negotiationType, localSslContext,
644           managedChannelImplBuilder.getOffloadExecutorPool());
645     }
646 
647     @Override
getDefaultPort()648     public int getDefaultPort() {
649       switch (negotiationType) {
650         case PLAINTEXT:
651         case PLAINTEXT_UPGRADE:
652           return GrpcUtil.DEFAULT_PORT_PLAINTEXT;
653         case TLS:
654           return GrpcUtil.DEFAULT_PORT_SSL;
655         default:
656           throw new AssertionError(negotiationType + " not handled");
657       }
658     }
659   }
660 
661   /**
662    * Creates Netty transports. Exposed for internal use, as it should be private.
663    */
664   private static final class NettyTransportFactory implements ClientTransportFactory {
665     private final ProtocolNegotiator protocolNegotiator;
666     private final ChannelFactory<? extends Channel> channelFactory;
667     private final Map<ChannelOption<?>, ?> channelOptions;
668     private final ObjectPool<? extends EventLoopGroup> groupPool;
669     private final EventLoopGroup group;
670     private final boolean autoFlowControl;
671     private final int flowControlWindow;
672     private final int maxMessageSize;
673     private final int maxHeaderListSize;
674     private final long keepAliveTimeNanos;
675     private final AtomicBackoff keepAliveBackoff;
676     private final long keepAliveTimeoutNanos;
677     private final boolean keepAliveWithoutCalls;
678     private final TransportTracer.Factory transportTracerFactory;
679     private final LocalSocketPicker localSocketPicker;
680     private final boolean useGetForSafeMethods;
681 
682     private boolean closed;
683 
NettyTransportFactory( ProtocolNegotiator protocolNegotiator, ChannelFactory<? extends Channel> channelFactory, Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool, boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, boolean useGetForSafeMethods)684     NettyTransportFactory(
685         ProtocolNegotiator protocolNegotiator,
686         ChannelFactory<? extends Channel> channelFactory,
687         Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
688         boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
689         long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
690         TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
691         boolean useGetForSafeMethods) {
692       this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
693       this.channelFactory = channelFactory;
694       this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
695       this.groupPool = groupPool;
696       this.group = groupPool.getObject();
697       this.autoFlowControl = autoFlowControl;
698       this.flowControlWindow = flowControlWindow;
699       this.maxMessageSize = maxMessageSize;
700       this.maxHeaderListSize = maxHeaderListSize;
701       this.keepAliveTimeNanos = keepAliveTimeNanos;
702       this.keepAliveBackoff = new AtomicBackoff("keepalive time nanos", keepAliveTimeNanos);
703       this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
704       this.keepAliveWithoutCalls = keepAliveWithoutCalls;
705       this.transportTracerFactory = transportTracerFactory;
706       this.localSocketPicker =
707           localSocketPicker != null ? localSocketPicker : new LocalSocketPicker();
708       this.useGetForSafeMethods = useGetForSafeMethods;
709     }
710 
711     @Override
newClientTransport( SocketAddress serverAddress, ClientTransportOptions options, ChannelLogger channelLogger)712     public ConnectionClientTransport newClientTransport(
713         SocketAddress serverAddress, ClientTransportOptions options, ChannelLogger channelLogger) {
714       checkState(!closed, "The transport factory is closed.");
715 
716       ProtocolNegotiator localNegotiator = protocolNegotiator;
717       HttpConnectProxiedSocketAddress proxiedAddr = options.getHttpConnectProxiedSocketAddress();
718       if (proxiedAddr != null) {
719         serverAddress = proxiedAddr.getTargetAddress();
720         localNegotiator = ProtocolNegotiators.httpProxy(
721             proxiedAddr.getProxyAddress(),
722             proxiedAddr.getUsername(),
723             proxiedAddr.getPassword(),
724             protocolNegotiator);
725       }
726 
727       final AtomicBackoff.State keepAliveTimeNanosState = keepAliveBackoff.getState();
728       Runnable tooManyPingsRunnable = new Runnable() {
729         @Override
730         public void run() {
731           keepAliveTimeNanosState.backoff();
732         }
733       };
734 
735       // TODO(carl-mastrangelo): Pass channelLogger in.
736       NettyClientTransport transport = new NettyClientTransport(
737           serverAddress, channelFactory, channelOptions, group,
738           localNegotiator, autoFlowControl, flowControlWindow,
739           maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
740           keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
741           tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(),
742           localSocketPicker, channelLogger, useGetForSafeMethods, Ticker.systemTicker());
743       return transport;
744     }
745 
746     @Override
getScheduledExecutorService()747     public ScheduledExecutorService getScheduledExecutorService() {
748       return group;
749     }
750 
751     @Override
swapChannelCredentials(ChannelCredentials channelCreds)752     public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials channelCreds) {
753       checkNotNull(channelCreds, "channelCreds");
754       FromChannelCredentialsResult result = ProtocolNegotiators.from(channelCreds);
755       if (result.error != null) {
756         return null;
757       }
758       ClientTransportFactory factory = new NettyTransportFactory(
759           result.negotiator.newNegotiator(), channelFactory, channelOptions, groupPool,
760           autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanos,
761           keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory,  localSocketPicker,
762           useGetForSafeMethods);
763       return new SwapChannelCredentialsResult(factory, result.callCredentials);
764     }
765 
766     @Override
close()767     public void close() {
768       if (closed) {
769         return;
770       }
771       closed = true;
772 
773       protocolNegotiator.close();
774       groupPool.returnObject(group);
775     }
776   }
777 }
778