1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS; 23 import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; 24 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.base.Ticker; 27 import com.google.errorprone.annotations.CanIgnoreReturnValue; 28 import com.google.errorprone.annotations.CheckReturnValue; 29 import com.google.errorprone.annotations.InlineMe; 30 import io.grpc.Attributes; 31 import io.grpc.CallCredentials; 32 import io.grpc.ChannelCredentials; 33 import io.grpc.ChannelLogger; 34 import io.grpc.EquivalentAddressGroup; 35 import io.grpc.ExperimentalApi; 36 import io.grpc.HttpConnectProxiedSocketAddress; 37 import io.grpc.Internal; 38 import io.grpc.ManagedChannelBuilder; 39 import io.grpc.internal.AbstractManagedChannelImplBuilder; 40 import io.grpc.internal.AtomicBackoff; 41 import io.grpc.internal.ClientTransportFactory; 42 import io.grpc.internal.ConnectionClientTransport; 43 import io.grpc.internal.FixedObjectPool; 44 import io.grpc.internal.GrpcUtil; 45 import io.grpc.internal.KeepAliveManager; 46 import io.grpc.internal.ManagedChannelImplBuilder; 47 import io.grpc.internal.ManagedChannelImplBuilder.ChannelBuilderDefaultPortProvider; 48 import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder; 49 import io.grpc.internal.ObjectPool; 50 import io.grpc.internal.SharedResourcePool; 51 import io.grpc.internal.TransportTracer; 52 import io.grpc.netty.ProtocolNegotiators.FromChannelCredentialsResult; 53 import io.netty.channel.Channel; 54 import io.netty.channel.ChannelFactory; 55 import io.netty.channel.ChannelOption; 56 import io.netty.channel.EventLoopGroup; 57 import io.netty.channel.ReflectiveChannelFactory; 58 import io.netty.channel.socket.nio.NioSocketChannel; 59 import io.netty.handler.ssl.SslContext; 60 import java.net.InetSocketAddress; 61 import java.net.SocketAddress; 62 import java.util.HashMap; 63 import java.util.Map; 64 import java.util.concurrent.Executor; 65 import java.util.concurrent.ScheduledExecutorService; 66 import java.util.concurrent.TimeUnit; 67 import javax.annotation.Nullable; 68 import javax.net.ssl.SSLException; 69 70 /** 71 * A builder to help simplify construction of channels using the Netty transport. 72 */ 73 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784") 74 @CheckReturnValue 75 public final class NettyChannelBuilder extends 76 AbstractManagedChannelImplBuilder<NettyChannelBuilder> { 77 78 // 1MiB. 79 public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1024 * 1024; 80 private static final boolean DEFAULT_AUTO_FLOW_CONTROL; 81 82 private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L); 83 84 private static final ChannelFactory<? extends Channel> DEFAULT_CHANNEL_FACTORY = 85 new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE); 86 private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL = 87 SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); 88 89 static { 90 String autoFlowControl = System.getenv("GRPC_EXPERIMENTAL_AUTOFLOWCONTROL"); 91 if (autoFlowControl == null) { 92 autoFlowControl = "true"; 93 } 94 DEFAULT_AUTO_FLOW_CONTROL = Boolean.parseBoolean(autoFlowControl); 95 } 96 97 private final ManagedChannelImplBuilder managedChannelImplBuilder; 98 private TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory(); 99 private final Map<ChannelOption<?>, Object> channelOptions = new HashMap<>(); 100 private ChannelFactory<? extends Channel> channelFactory = DEFAULT_CHANNEL_FACTORY; 101 private ObjectPool<? extends EventLoopGroup> eventLoopGroupPool = DEFAULT_EVENT_LOOP_GROUP_POOL; 102 private boolean autoFlowControl = DEFAULT_AUTO_FLOW_CONTROL; 103 private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; 104 private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE; 105 private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED; 106 private long keepAliveTimeoutNanos = DEFAULT_KEEPALIVE_TIMEOUT_NANOS; 107 private boolean keepAliveWithoutCalls; 108 private ProtocolNegotiator.ClientFactory protocolNegotiatorFactory 109 = new DefaultProtocolNegotiator(); 110 private final boolean freezeProtocolNegotiatorFactory; 111 private LocalSocketPicker localSocketPicker; 112 113 /** 114 * If true, indicates that the transport may use the GET method for RPCs, and may include the 115 * request body in the query params. 116 */ 117 private final boolean useGetForSafeMethods = false; 118 119 /** 120 * Creates a new builder with the given server address. This factory method is primarily intended 121 * for using Netty Channel types other than SocketChannel. {@link #forAddress(String, int)} should 122 * generally be preferred over this method, since that API permits delaying DNS lookups and 123 * noticing changes to DNS. If an unresolved InetSocketAddress is passed in, then it will remain 124 * unresolved. 125 */ forAddress(SocketAddress serverAddress)126 public static NettyChannelBuilder forAddress(SocketAddress serverAddress) { 127 return new NettyChannelBuilder(serverAddress); 128 } 129 130 /** 131 * Creates a new builder with the given server address. This factory method is primarily intended 132 * for using Netty Channel types other than SocketChannel. 133 * {@link #forAddress(String, int, ChannelCredentials)} should generally be preferred over this 134 * method, since that API permits delaying DNS lookups and noticing changes to DNS. If an 135 * unresolved InetSocketAddress is passed in, then it will remain unresolved. 136 */ forAddress(SocketAddress serverAddress, ChannelCredentials creds)137 public static NettyChannelBuilder forAddress(SocketAddress serverAddress, 138 ChannelCredentials creds) { 139 FromChannelCredentialsResult result = ProtocolNegotiators.from(creds); 140 if (result.error != null) { 141 throw new IllegalArgumentException(result.error); 142 } 143 return new NettyChannelBuilder(serverAddress, creds, result.callCredentials, result.negotiator); 144 } 145 146 /** 147 * Creates a new builder with the given host and port. 148 */ forAddress(String host, int port)149 public static NettyChannelBuilder forAddress(String host, int port) { 150 return forTarget(GrpcUtil.authorityFromHostAndPort(host, port)); 151 } 152 153 /** 154 * Creates a new builder with the given host and port. 155 */ forAddress(String host, int port, ChannelCredentials creds)156 public static NettyChannelBuilder forAddress(String host, int port, ChannelCredentials creds) { 157 return forTarget(GrpcUtil.authorityFromHostAndPort(host, port), creds); 158 } 159 160 /** 161 * Creates a new builder with the given target string that will be resolved by 162 * {@link io.grpc.NameResolver}. 163 */ forTarget(String target)164 public static NettyChannelBuilder forTarget(String target) { 165 return new NettyChannelBuilder(target); 166 } 167 168 /** 169 * Creates a new builder with the given target string that will be resolved by 170 * {@link io.grpc.NameResolver}. 171 */ forTarget(String target, ChannelCredentials creds)172 public static NettyChannelBuilder forTarget(String target, ChannelCredentials creds) { 173 FromChannelCredentialsResult result = ProtocolNegotiators.from(creds); 174 if (result.error != null) { 175 throw new IllegalArgumentException(result.error); 176 } 177 return new NettyChannelBuilder(target, creds, result.callCredentials, result.negotiator); 178 } 179 180 private final class NettyChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder { 181 @Override buildClientTransportFactory()182 public ClientTransportFactory buildClientTransportFactory() { 183 return buildTransportFactory(); 184 } 185 } 186 187 private final class NettyChannelDefaultPortProvider implements ChannelBuilderDefaultPortProvider { 188 @Override getDefaultPort()189 public int getDefaultPort() { 190 return protocolNegotiatorFactory.getDefaultPort(); 191 } 192 } 193 NettyChannelBuilder(String target)194 NettyChannelBuilder(String target) { 195 managedChannelImplBuilder = new ManagedChannelImplBuilder(target, 196 new NettyChannelTransportFactoryBuilder(), 197 new NettyChannelDefaultPortProvider()); 198 this.freezeProtocolNegotiatorFactory = false; 199 } 200 NettyChannelBuilder( String target, ChannelCredentials channelCreds, CallCredentials callCreds, ProtocolNegotiator.ClientFactory negotiator)201 NettyChannelBuilder( 202 String target, ChannelCredentials channelCreds, CallCredentials callCreds, 203 ProtocolNegotiator.ClientFactory negotiator) { 204 managedChannelImplBuilder = new ManagedChannelImplBuilder( 205 target, channelCreds, callCreds, 206 new NettyChannelTransportFactoryBuilder(), 207 new NettyChannelDefaultPortProvider()); 208 this.protocolNegotiatorFactory = checkNotNull(negotiator, "negotiator"); 209 this.freezeProtocolNegotiatorFactory = true; 210 } 211 NettyChannelBuilder(SocketAddress address)212 NettyChannelBuilder(SocketAddress address) { 213 managedChannelImplBuilder = new ManagedChannelImplBuilder(address, 214 getAuthorityFromAddress(address), 215 new NettyChannelTransportFactoryBuilder(), 216 new NettyChannelDefaultPortProvider()); 217 this.freezeProtocolNegotiatorFactory = false; 218 } 219 NettyChannelBuilder( SocketAddress address, ChannelCredentials channelCreds, CallCredentials callCreds, ProtocolNegotiator.ClientFactory negotiator)220 NettyChannelBuilder( 221 SocketAddress address, ChannelCredentials channelCreds, CallCredentials callCreds, 222 ProtocolNegotiator.ClientFactory negotiator) { 223 managedChannelImplBuilder = new ManagedChannelImplBuilder(address, 224 getAuthorityFromAddress(address), 225 channelCreds, callCreds, 226 new NettyChannelTransportFactoryBuilder(), 227 new NettyChannelDefaultPortProvider()); 228 this.protocolNegotiatorFactory = checkNotNull(negotiator, "negotiator"); 229 this.freezeProtocolNegotiatorFactory = true; 230 } 231 232 @Internal 233 @Override delegate()234 protected ManagedChannelBuilder<?> delegate() { 235 return managedChannelImplBuilder; 236 } 237 getAuthorityFromAddress(SocketAddress address)238 private static String getAuthorityFromAddress(SocketAddress address) { 239 if (address instanceof InetSocketAddress) { 240 InetSocketAddress inetAddress = (InetSocketAddress) address; 241 return GrpcUtil.authorityFromHostAndPort(inetAddress.getHostString(), inetAddress.getPort()); 242 } else { 243 return address.toString(); 244 } 245 } 246 247 /** 248 * Specifies the channel type to use, by default we use {@code EpollSocketChannel} if available, 249 * otherwise using {@link NioSocketChannel}. 250 * 251 * <p>You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your 252 * {@link Channel} implementation has no no-args constructor. 253 * 254 * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory 255 * when the channel is built, the builder will use the default one which is static. 256 * 257 * <p>You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example, 258 * {@link NioSocketChannel} must use {@link io.netty.channel.nio.NioEventLoopGroup}, otherwise 259 * your application won't start. 260 */ 261 @CanIgnoreReturnValue channelType(Class<? extends Channel> channelType)262 public NettyChannelBuilder channelType(Class<? extends Channel> channelType) { 263 checkNotNull(channelType, "channelType"); 264 return channelFactory(new ReflectiveChannelFactory<>(channelType)); 265 } 266 267 /** 268 * Specifies the {@link ChannelFactory} to create {@link Channel} instances. This method is 269 * usually only used if the specific {@code Channel} requires complex logic which requires 270 * additional information to create the {@code Channel}. Otherwise, recommend to use {@link 271 * #channelType(Class)}. 272 * 273 * <p>It's an optional parameter. If the user has not provided an Channel type or ChannelFactory 274 * when the channel is built, the builder will use the default one which is static. 275 * 276 * <p>You must also provide corresponding {@link #eventLoopGroup(EventLoopGroup)}. For example, 277 * {@link NioSocketChannel} based {@link ChannelFactory} must use {@link 278 * io.netty.channel.nio.NioEventLoopGroup}, otherwise your application won't start. 279 */ 280 @CanIgnoreReturnValue channelFactory(ChannelFactory<? extends Channel> channelFactory)281 public NettyChannelBuilder channelFactory(ChannelFactory<? extends Channel> channelFactory) { 282 this.channelFactory = checkNotNull(channelFactory, "channelFactory"); 283 return this; 284 } 285 286 /** 287 * Specifies a channel option. As the underlying channel as well as network implementation may 288 * ignore this value applications should consider it a hint. 289 */ 290 @CanIgnoreReturnValue withOption(ChannelOption<T> option, T value)291 public <T> NettyChannelBuilder withOption(ChannelOption<T> option, T value) { 292 channelOptions.put(option, value); 293 return this; 294 } 295 296 /** 297 * Sets the negotiation type for the HTTP/2 connection. 298 * 299 * <p>Default: <code>TLS</code> 300 */ 301 @CanIgnoreReturnValue negotiationType(NegotiationType type)302 public NettyChannelBuilder negotiationType(NegotiationType type) { 303 checkState(!freezeProtocolNegotiatorFactory, 304 "Cannot change security when using ChannelCredentials"); 305 if (!(protocolNegotiatorFactory instanceof DefaultProtocolNegotiator)) { 306 // Do nothing for compatibility 307 return this; 308 } 309 ((DefaultProtocolNegotiator) protocolNegotiatorFactory).negotiationType = type; 310 return this; 311 } 312 313 /** 314 * Provides an EventGroupLoop to be used by the netty transport. 315 * 316 * <p>It's an optional parameter. If the user has not provided an EventGroupLoop when the channel 317 * is built, the builder will use the default one which is static. 318 * 319 * <p>You must also provide corresponding {@link #channelType(Class)} or {@link 320 * #channelFactory(ChannelFactory)} corresponding to the given {@code EventLoopGroup}. For 321 * example, {@link io.netty.channel.nio.NioEventLoopGroup} requires {@link NioSocketChannel} 322 * 323 * <p>The channel won't take ownership of the given EventLoopGroup. It's caller's responsibility 324 * to shut it down when it's desired. 325 */ 326 @CanIgnoreReturnValue eventLoopGroup(@ullable EventLoopGroup eventLoopGroup)327 public NettyChannelBuilder eventLoopGroup(@Nullable EventLoopGroup eventLoopGroup) { 328 if (eventLoopGroup != null) { 329 return eventLoopGroupPool(new FixedObjectPool<>(eventLoopGroup)); 330 } 331 return eventLoopGroupPool(DEFAULT_EVENT_LOOP_GROUP_POOL); 332 } 333 334 @CanIgnoreReturnValue eventLoopGroupPool(ObjectPool<? extends EventLoopGroup> eventLoopGroupPool)335 NettyChannelBuilder eventLoopGroupPool(ObjectPool<? extends EventLoopGroup> eventLoopGroupPool) { 336 this.eventLoopGroupPool = checkNotNull(eventLoopGroupPool, "eventLoopGroupPool"); 337 return this; 338 } 339 340 /** 341 * SSL/TLS context to use instead of the system default. It must have been configured with {@link 342 * GrpcSslContexts}, but options could have been overridden. 343 */ 344 @CanIgnoreReturnValue sslContext(SslContext sslContext)345 public NettyChannelBuilder sslContext(SslContext sslContext) { 346 checkState(!freezeProtocolNegotiatorFactory, 347 "Cannot change security when using ChannelCredentials"); 348 if (sslContext != null) { 349 checkArgument(sslContext.isClient(), 350 "Server SSL context can not be used for client channel"); 351 GrpcSslContexts.ensureAlpnAndH2Enabled(sslContext.applicationProtocolNegotiator()); 352 } 353 if (!(protocolNegotiatorFactory instanceof DefaultProtocolNegotiator)) { 354 // Do nothing for compatibility 355 return this; 356 } 357 ((DefaultProtocolNegotiator) protocolNegotiatorFactory).sslContext = sslContext; 358 return this; 359 } 360 361 /** 362 * Sets the initial flow control window in bytes. Setting initial flow control window enables auto 363 * flow control tuning using bandwidth-delay product algorithm. To disable auto flow control 364 * tuning, use {@link #flowControlWindow(int)}. By default, auto flow control is enabled with 365 * initial flow control window size of {@link #DEFAULT_FLOW_CONTROL_WINDOW}. 366 */ 367 @CanIgnoreReturnValue initialFlowControlWindow(int initialFlowControlWindow)368 public NettyChannelBuilder initialFlowControlWindow(int initialFlowControlWindow) { 369 checkArgument(initialFlowControlWindow > 0, "initialFlowControlWindow must be positive"); 370 this.flowControlWindow = initialFlowControlWindow; 371 this.autoFlowControl = true; 372 return this; 373 } 374 375 /** 376 * Sets the flow control window in bytes. Setting flowControlWindow disables auto flow control 377 * tuning; use {@link #initialFlowControlWindow(int)} to enable auto flow control tuning. If not 378 * called, the default value is {@link #DEFAULT_FLOW_CONTROL_WINDOW}) with auto flow control 379 * tuning. 380 */ 381 @CanIgnoreReturnValue flowControlWindow(int flowControlWindow)382 public NettyChannelBuilder flowControlWindow(int flowControlWindow) { 383 checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); 384 this.flowControlWindow = flowControlWindow; 385 this.autoFlowControl = false; 386 return this; 387 } 388 389 /** 390 * Sets the maximum size of header list allowed to be received. This is cumulative size of the 391 * headers with some overhead, as defined for 392 * <a href="http://httpwg.org/specs/rfc7540.html#rfc.section.6.5.2"> 393 * HTTP/2's SETTINGS_MAX_HEADER_LIST_SIZE</a>. The default is 8 KiB. 394 * 395 * @deprecated Use {@link #maxInboundMetadataSize} instead 396 */ 397 @CanIgnoreReturnValue 398 @Deprecated 399 @InlineMe(replacement = "this.maxInboundMetadataSize(maxHeaderListSize)") maxHeaderListSize(int maxHeaderListSize)400 public NettyChannelBuilder maxHeaderListSize(int maxHeaderListSize) { 401 return maxInboundMetadataSize(maxHeaderListSize); 402 } 403 404 /** 405 * Sets the maximum size of metadata allowed to be received. This is cumulative size of the 406 * entries with some overhead, as defined for 407 * <a href="http://httpwg.org/specs/rfc7540.html#rfc.section.6.5.2"> 408 * HTTP/2's SETTINGS_MAX_HEADER_LIST_SIZE</a>. The default is 8 KiB. 409 * 410 * @param bytes the maximum size of received metadata 411 * @return this 412 * @throws IllegalArgumentException if bytes is non-positive 413 * @since 1.17.0 414 */ 415 @CanIgnoreReturnValue 416 @Override maxInboundMetadataSize(int bytes)417 public NettyChannelBuilder maxInboundMetadataSize(int bytes) { 418 checkArgument(bytes > 0, "maxInboundMetadataSize must be > 0"); 419 this.maxHeaderListSize = bytes; 420 return this; 421 } 422 423 /** 424 * Equivalent to using {@link #negotiationType(NegotiationType)} with {@code PLAINTEXT}. 425 */ 426 @CanIgnoreReturnValue 427 @Override usePlaintext()428 public NettyChannelBuilder usePlaintext() { 429 negotiationType(NegotiationType.PLAINTEXT); 430 return this; 431 } 432 433 /** 434 * Equivalent to using {@link #negotiationType(NegotiationType)} with {@code TLS}. 435 */ 436 @CanIgnoreReturnValue 437 @Override useTransportSecurity()438 public NettyChannelBuilder useTransportSecurity() { 439 negotiationType(NegotiationType.TLS); 440 return this; 441 } 442 443 /** 444 * {@inheritDoc} 445 * 446 * @since 1.3.0 447 */ 448 @CanIgnoreReturnValue 449 @Override keepAliveTime(long keepAliveTime, TimeUnit timeUnit)450 public NettyChannelBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit) { 451 checkArgument(keepAliveTime > 0L, "keepalive time must be positive"); 452 keepAliveTimeNanos = timeUnit.toNanos(keepAliveTime); 453 keepAliveTimeNanos = KeepAliveManager.clampKeepAliveTimeInNanos(keepAliveTimeNanos); 454 if (keepAliveTimeNanos >= AS_LARGE_AS_INFINITE) { 455 // Bump keepalive time to infinite. This disables keepalive. 456 keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED; 457 } 458 return this; 459 } 460 461 /** 462 * {@inheritDoc} 463 * 464 * @since 1.3.0 465 */ 466 @CanIgnoreReturnValue 467 @Override keepAliveTimeout(long keepAliveTimeout, TimeUnit timeUnit)468 public NettyChannelBuilder keepAliveTimeout(long keepAliveTimeout, TimeUnit timeUnit) { 469 checkArgument(keepAliveTimeout > 0L, "keepalive timeout must be positive"); 470 keepAliveTimeoutNanos = timeUnit.toNanos(keepAliveTimeout); 471 keepAliveTimeoutNanos = KeepAliveManager.clampKeepAliveTimeoutInNanos(keepAliveTimeoutNanos); 472 return this; 473 } 474 475 /** 476 * {@inheritDoc} 477 * 478 * @since 1.3.0 479 */ 480 @CanIgnoreReturnValue 481 @Override keepAliveWithoutCalls(boolean enable)482 public NettyChannelBuilder keepAliveWithoutCalls(boolean enable) { 483 keepAliveWithoutCalls = enable; 484 return this; 485 } 486 487 488 /** 489 * If non-{@code null}, attempts to create connections bound to a local port. 490 */ 491 @CanIgnoreReturnValue localSocketPicker(@ullable LocalSocketPicker localSocketPicker)492 public NettyChannelBuilder localSocketPicker(@Nullable LocalSocketPicker localSocketPicker) { 493 this.localSocketPicker = localSocketPicker; 494 return this; 495 } 496 497 /** 498 * This class is meant to be overriden with a custom implementation of 499 * {@link #createSocketAddress}. The default implementation is a no-op. 500 * 501 * @since 1.16.0 502 */ 503 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4917") 504 public static class LocalSocketPicker { 505 506 /** 507 * Called by gRPC to pick local socket to bind to. This may be called multiple times. 508 * Subclasses are expected to override this method. 509 * 510 * @param remoteAddress the remote address to connect to. 511 * @param attrs the Attributes present on the {@link io.grpc.EquivalentAddressGroup} associated 512 * with the address. 513 * @return a {@link SocketAddress} suitable for binding, or else {@code null}. 514 * @since 1.16.0 515 */ 516 @Nullable createSocketAddress( SocketAddress remoteAddress, @EquivalentAddressGroup.Attr Attributes attrs)517 public SocketAddress createSocketAddress( 518 SocketAddress remoteAddress, @EquivalentAddressGroup.Attr Attributes attrs) { 519 return null; 520 } 521 } 522 523 /** 524 * Sets the maximum message size allowed for a single gRPC frame. If an inbound messages larger 525 * than this limit is received it will not be processed and the RPC will fail with 526 * RESOURCE_EXHAUSTED. 527 */ 528 @CanIgnoreReturnValue 529 @Override maxInboundMessageSize(int max)530 public NettyChannelBuilder maxInboundMessageSize(int max) { 531 checkArgument(max >= 0, "negative max"); 532 maxInboundMessageSize = max; 533 return this; 534 } 535 buildTransportFactory()536 ClientTransportFactory buildTransportFactory() { 537 assertEventLoopAndChannelType(); 538 539 ProtocolNegotiator negotiator = protocolNegotiatorFactory.newNegotiator(); 540 return new NettyTransportFactory( 541 negotiator, channelFactory, channelOptions, 542 eventLoopGroupPool, autoFlowControl, flowControlWindow, maxInboundMessageSize, 543 maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, 544 transportTracerFactory, localSocketPicker, useGetForSafeMethods); 545 } 546 547 @VisibleForTesting assertEventLoopAndChannelType()548 void assertEventLoopAndChannelType() { 549 boolean bothProvided = channelFactory != DEFAULT_CHANNEL_FACTORY 550 && eventLoopGroupPool != DEFAULT_EVENT_LOOP_GROUP_POOL; 551 boolean nonProvided = channelFactory == DEFAULT_CHANNEL_FACTORY 552 && eventLoopGroupPool == DEFAULT_EVENT_LOOP_GROUP_POOL; 553 checkState( 554 bothProvided || nonProvided, 555 "Both EventLoopGroup and ChannelType should be provided or neither should be"); 556 } 557 getDefaultPort()558 int getDefaultPort() { 559 return protocolNegotiatorFactory.getDefaultPort(); 560 } 561 562 @VisibleForTesting createProtocolNegotiatorByType( NegotiationType negotiationType, SslContext sslContext, ObjectPool<? extends Executor> executorPool)563 static ProtocolNegotiator createProtocolNegotiatorByType( 564 NegotiationType negotiationType, 565 SslContext sslContext, 566 ObjectPool<? extends Executor> executorPool) { 567 switch (negotiationType) { 568 case PLAINTEXT: 569 return ProtocolNegotiators.plaintext(); 570 case PLAINTEXT_UPGRADE: 571 return ProtocolNegotiators.plaintextUpgrade(); 572 case TLS: 573 return ProtocolNegotiators.tls(sslContext, executorPool); 574 default: 575 throw new IllegalArgumentException("Unsupported negotiationType: " + negotiationType); 576 } 577 } 578 579 @CanIgnoreReturnValue disableCheckAuthority()580 NettyChannelBuilder disableCheckAuthority() { 581 this.managedChannelImplBuilder.disableCheckAuthority(); 582 return this; 583 } 584 585 @CanIgnoreReturnValue enableCheckAuthority()586 NettyChannelBuilder enableCheckAuthority() { 587 this.managedChannelImplBuilder.enableCheckAuthority(); 588 return this; 589 } 590 protocolNegotiatorFactory(ProtocolNegotiator.ClientFactory protocolNegotiatorFactory)591 void protocolNegotiatorFactory(ProtocolNegotiator.ClientFactory protocolNegotiatorFactory) { 592 checkState(!freezeProtocolNegotiatorFactory, 593 "Cannot change security when using ChannelCredentials"); 594 this.protocolNegotiatorFactory 595 = checkNotNull(protocolNegotiatorFactory, "protocolNegotiatorFactory"); 596 } 597 setTracingEnabled(boolean value)598 void setTracingEnabled(boolean value) { 599 this.managedChannelImplBuilder.setTracingEnabled(value); 600 } 601 setStatsEnabled(boolean value)602 void setStatsEnabled(boolean value) { 603 this.managedChannelImplBuilder.setStatsEnabled(value); 604 } 605 setStatsRecordStartedRpcs(boolean value)606 void setStatsRecordStartedRpcs(boolean value) { 607 this.managedChannelImplBuilder.setStatsRecordStartedRpcs(value); 608 } 609 setStatsRecordFinishedRpcs(boolean value)610 void setStatsRecordFinishedRpcs(boolean value) { 611 this.managedChannelImplBuilder.setStatsRecordFinishedRpcs(value); 612 } 613 setStatsRecordRealTimeMetrics(boolean value)614 void setStatsRecordRealTimeMetrics(boolean value) { 615 this.managedChannelImplBuilder.setStatsRecordRealTimeMetrics(value); 616 } 617 setStatsRecordRetryMetrics(boolean value)618 void setStatsRecordRetryMetrics(boolean value) { 619 this.managedChannelImplBuilder.setStatsRecordRetryMetrics(value); 620 } 621 622 @CanIgnoreReturnValue 623 @VisibleForTesting setTransportTracerFactory(TransportTracer.Factory transportTracerFactory)624 NettyChannelBuilder setTransportTracerFactory(TransportTracer.Factory transportTracerFactory) { 625 this.transportTracerFactory = transportTracerFactory; 626 return this; 627 } 628 629 private final class DefaultProtocolNegotiator implements ProtocolNegotiator.ClientFactory { 630 private NegotiationType negotiationType = NegotiationType.TLS; 631 private SslContext sslContext; 632 633 @Override newNegotiator()634 public ProtocolNegotiator newNegotiator() { 635 SslContext localSslContext = sslContext; 636 if (negotiationType == NegotiationType.TLS && localSslContext == null) { 637 try { 638 localSslContext = GrpcSslContexts.forClient().build(); 639 } catch (SSLException ex) { 640 throw new RuntimeException(ex); 641 } 642 } 643 return createProtocolNegotiatorByType(negotiationType, localSslContext, 644 managedChannelImplBuilder.getOffloadExecutorPool()); 645 } 646 647 @Override getDefaultPort()648 public int getDefaultPort() { 649 switch (negotiationType) { 650 case PLAINTEXT: 651 case PLAINTEXT_UPGRADE: 652 return GrpcUtil.DEFAULT_PORT_PLAINTEXT; 653 case TLS: 654 return GrpcUtil.DEFAULT_PORT_SSL; 655 default: 656 throw new AssertionError(negotiationType + " not handled"); 657 } 658 } 659 } 660 661 /** 662 * Creates Netty transports. Exposed for internal use, as it should be private. 663 */ 664 private static final class NettyTransportFactory implements ClientTransportFactory { 665 private final ProtocolNegotiator protocolNegotiator; 666 private final ChannelFactory<? extends Channel> channelFactory; 667 private final Map<ChannelOption<?>, ?> channelOptions; 668 private final ObjectPool<? extends EventLoopGroup> groupPool; 669 private final EventLoopGroup group; 670 private final boolean autoFlowControl; 671 private final int flowControlWindow; 672 private final int maxMessageSize; 673 private final int maxHeaderListSize; 674 private final long keepAliveTimeNanos; 675 private final AtomicBackoff keepAliveBackoff; 676 private final long keepAliveTimeoutNanos; 677 private final boolean keepAliveWithoutCalls; 678 private final TransportTracer.Factory transportTracerFactory; 679 private final LocalSocketPicker localSocketPicker; 680 private final boolean useGetForSafeMethods; 681 682 private boolean closed; 683 NettyTransportFactory( ProtocolNegotiator protocolNegotiator, ChannelFactory<? extends Channel> channelFactory, Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool, boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, boolean useGetForSafeMethods)684 NettyTransportFactory( 685 ProtocolNegotiator protocolNegotiator, 686 ChannelFactory<? extends Channel> channelFactory, 687 Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool, 688 boolean autoFlowControl, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, 689 long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, 690 TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, 691 boolean useGetForSafeMethods) { 692 this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); 693 this.channelFactory = channelFactory; 694 this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions); 695 this.groupPool = groupPool; 696 this.group = groupPool.getObject(); 697 this.autoFlowControl = autoFlowControl; 698 this.flowControlWindow = flowControlWindow; 699 this.maxMessageSize = maxMessageSize; 700 this.maxHeaderListSize = maxHeaderListSize; 701 this.keepAliveTimeNanos = keepAliveTimeNanos; 702 this.keepAliveBackoff = new AtomicBackoff("keepalive time nanos", keepAliveTimeNanos); 703 this.keepAliveTimeoutNanos = keepAliveTimeoutNanos; 704 this.keepAliveWithoutCalls = keepAliveWithoutCalls; 705 this.transportTracerFactory = transportTracerFactory; 706 this.localSocketPicker = 707 localSocketPicker != null ? localSocketPicker : new LocalSocketPicker(); 708 this.useGetForSafeMethods = useGetForSafeMethods; 709 } 710 711 @Override newClientTransport( SocketAddress serverAddress, ClientTransportOptions options, ChannelLogger channelLogger)712 public ConnectionClientTransport newClientTransport( 713 SocketAddress serverAddress, ClientTransportOptions options, ChannelLogger channelLogger) { 714 checkState(!closed, "The transport factory is closed."); 715 716 ProtocolNegotiator localNegotiator = protocolNegotiator; 717 HttpConnectProxiedSocketAddress proxiedAddr = options.getHttpConnectProxiedSocketAddress(); 718 if (proxiedAddr != null) { 719 serverAddress = proxiedAddr.getTargetAddress(); 720 localNegotiator = ProtocolNegotiators.httpProxy( 721 proxiedAddr.getProxyAddress(), 722 proxiedAddr.getUsername(), 723 proxiedAddr.getPassword(), 724 protocolNegotiator); 725 } 726 727 final AtomicBackoff.State keepAliveTimeNanosState = keepAliveBackoff.getState(); 728 Runnable tooManyPingsRunnable = new Runnable() { 729 @Override 730 public void run() { 731 keepAliveTimeNanosState.backoff(); 732 } 733 }; 734 735 // TODO(carl-mastrangelo): Pass channelLogger in. 736 NettyClientTransport transport = new NettyClientTransport( 737 serverAddress, channelFactory, channelOptions, group, 738 localNegotiator, autoFlowControl, flowControlWindow, 739 maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, 740 keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), 741 tooManyPingsRunnable, transportTracerFactory.create(), options.getEagAttributes(), 742 localSocketPicker, channelLogger, useGetForSafeMethods, Ticker.systemTicker()); 743 return transport; 744 } 745 746 @Override getScheduledExecutorService()747 public ScheduledExecutorService getScheduledExecutorService() { 748 return group; 749 } 750 751 @Override swapChannelCredentials(ChannelCredentials channelCreds)752 public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials channelCreds) { 753 checkNotNull(channelCreds, "channelCreds"); 754 FromChannelCredentialsResult result = ProtocolNegotiators.from(channelCreds); 755 if (result.error != null) { 756 return null; 757 } 758 ClientTransportFactory factory = new NettyTransportFactory( 759 result.negotiator.newNegotiator(), channelFactory, channelOptions, groupPool, 760 autoFlowControl, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanos, 761 keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory, localSocketPicker, 762 useGetForSafeMethods); 763 return new SwapChannelCredentialsResult(factory, result.callCredentials); 764 } 765 766 @Override close()767 public void close() { 768 if (closed) { 769 return; 770 } 771 closed = true; 772 773 protocolNegotiator.close(); 774 groupPool.returnObject(group); 775 } 776 } 777 } 778