1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.Preconditions; 23 import com.google.common.util.concurrent.MoreExecutors; 24 import io.grpc.Attributes; 25 import io.grpc.BinaryLog; 26 import io.grpc.ClientInterceptor; 27 import io.grpc.CompressorRegistry; 28 import io.grpc.DecompressorRegistry; 29 import io.grpc.EquivalentAddressGroup; 30 import io.grpc.InternalChannelz; 31 import io.grpc.LoadBalancer; 32 import io.grpc.ManagedChannel; 33 import io.grpc.ManagedChannelBuilder; 34 import io.grpc.NameResolver; 35 import io.grpc.NameResolverProvider; 36 import io.opencensus.trace.Tracing; 37 import java.net.SocketAddress; 38 import java.net.URI; 39 import java.net.URISyntaxException; 40 import java.util.ArrayList; 41 import java.util.Arrays; 42 import java.util.Collections; 43 import java.util.List; 44 import java.util.concurrent.Executor; 45 import java.util.concurrent.TimeUnit; 46 import javax.annotation.Nullable; 47 48 /** 49 * The base class for channel builders. 50 * 51 * @param <T> The concrete type of this builder. 52 */ 53 public abstract class AbstractManagedChannelImplBuilder 54 <T extends AbstractManagedChannelImplBuilder<T>> extends ManagedChannelBuilder<T> { 55 private static final String DIRECT_ADDRESS_SCHEME = "directaddress"; 56 forAddress(String name, int port)57 public static ManagedChannelBuilder<?> forAddress(String name, int port) { 58 throw new UnsupportedOperationException("Subclass failed to hide static factory"); 59 } 60 forTarget(String target)61 public static ManagedChannelBuilder<?> forTarget(String target) { 62 throw new UnsupportedOperationException("Subclass failed to hide static factory"); 63 } 64 65 /** 66 * An idle timeout larger than this would disable idle mode. 67 */ 68 @VisibleForTesting 69 static final long IDLE_MODE_MAX_TIMEOUT_DAYS = 30; 70 71 /** 72 * The default idle timeout. 73 */ 74 @VisibleForTesting 75 static final long IDLE_MODE_DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30); 76 77 /** 78 * An idle timeout smaller than this would be capped to it. 79 */ 80 @VisibleForTesting 81 static final long IDLE_MODE_MIN_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); 82 83 private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL = 84 SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR); 85 86 private static final NameResolver.Factory DEFAULT_NAME_RESOLVER_FACTORY = 87 NameResolverProvider.asFactory(); 88 89 private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY = 90 DecompressorRegistry.getDefaultInstance(); 91 92 private static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY = 93 CompressorRegistry.getDefaultInstance(); 94 95 private static final long DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES = 1L << 24; // 16M 96 private static final long DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES = 1L << 20; // 1M 97 98 ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL; 99 100 private final List<ClientInterceptor> interceptors = new ArrayList<>(); 101 102 // Access via getter, which may perform authority override as needed 103 private NameResolver.Factory nameResolverFactory = DEFAULT_NAME_RESOLVER_FACTORY; 104 105 final String target; 106 107 @Nullable 108 private final SocketAddress directServerAddress; 109 110 @Nullable 111 String userAgent; 112 113 @VisibleForTesting 114 @Nullable 115 String authorityOverride; 116 117 118 @Nullable LoadBalancer.Factory loadBalancerFactory; 119 120 boolean fullStreamDecompression; 121 122 DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; 123 124 CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; 125 126 long idleTimeoutMillis = IDLE_MODE_DEFAULT_TIMEOUT_MILLIS; 127 128 int maxRetryAttempts = 5; 129 int maxHedgedAttempts = 5; 130 long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES; 131 long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES; 132 boolean retryEnabled = false; // TODO(zdapeng): default to true 133 // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know 134 // what should be the desired behavior for retry + stats/tracing. 135 // TODO(zdapeng): delete me 136 boolean temporarilyDisableRetry; 137 138 InternalChannelz channelz = InternalChannelz.instance(); 139 int maxTraceEvents; 140 141 protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory(); 142 143 private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; 144 145 @Nullable 146 BinaryLog binlog; 147 148 /** 149 * Sets the maximum message size allowed for a single gRPC frame. If an inbound messages 150 * larger than this limit is received it will not be processed and the RPC will fail with 151 * RESOURCE_EXHAUSTED. 152 */ 153 // Can be overridden by subclasses. 154 @Override maxInboundMessageSize(int max)155 public T maxInboundMessageSize(int max) { 156 checkArgument(max >= 0, "negative max"); 157 maxInboundMessageSize = max; 158 return thisT(); 159 } 160 maxInboundMessageSize()161 protected final int maxInboundMessageSize() { 162 return maxInboundMessageSize; 163 } 164 165 private boolean statsEnabled = true; 166 private boolean recordStartedRpcs = true; 167 private boolean recordFinishedRpcs = true; 168 private boolean tracingEnabled = true; 169 170 @Nullable 171 private CensusStatsModule censusStatsOverride; 172 AbstractManagedChannelImplBuilder(String target)173 protected AbstractManagedChannelImplBuilder(String target) { 174 this.target = Preconditions.checkNotNull(target, "target"); 175 this.directServerAddress = null; 176 } 177 178 /** 179 * Returns a target string for the SocketAddress. It is only used as a placeholder, because 180 * DirectAddressNameResolverFactory will not actually try to use it. However, it must be a valid 181 * URI. 182 */ 183 @VisibleForTesting makeTargetStringForDirectAddress(SocketAddress address)184 static String makeTargetStringForDirectAddress(SocketAddress address) { 185 try { 186 return new URI(DIRECT_ADDRESS_SCHEME, "", "/" + address, null).toString(); 187 } catch (URISyntaxException e) { 188 // It should not happen. 189 throw new RuntimeException(e); 190 } 191 } 192 AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority)193 protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority) { 194 this.target = makeTargetStringForDirectAddress(directServerAddress); 195 this.directServerAddress = directServerAddress; 196 this.nameResolverFactory = new DirectAddressNameResolverFactory(directServerAddress, authority); 197 } 198 199 @Override directExecutor()200 public final T directExecutor() { 201 return executor(MoreExecutors.directExecutor()); 202 } 203 204 @Override executor(Executor executor)205 public final T executor(Executor executor) { 206 if (executor != null) { 207 this.executorPool = new FixedObjectPool<Executor>(executor); 208 } else { 209 this.executorPool = DEFAULT_EXECUTOR_POOL; 210 } 211 return thisT(); 212 } 213 214 @Override intercept(List<ClientInterceptor> interceptors)215 public final T intercept(List<ClientInterceptor> interceptors) { 216 this.interceptors.addAll(interceptors); 217 return thisT(); 218 } 219 220 @Override intercept(ClientInterceptor... interceptors)221 public final T intercept(ClientInterceptor... interceptors) { 222 return intercept(Arrays.asList(interceptors)); 223 } 224 225 @Override nameResolverFactory(NameResolver.Factory resolverFactory)226 public final T nameResolverFactory(NameResolver.Factory resolverFactory) { 227 Preconditions.checkState(directServerAddress == null, 228 "directServerAddress is set (%s), which forbids the use of NameResolverFactory", 229 directServerAddress); 230 if (resolverFactory != null) { 231 this.nameResolverFactory = resolverFactory; 232 } else { 233 this.nameResolverFactory = DEFAULT_NAME_RESOLVER_FACTORY; 234 } 235 return thisT(); 236 } 237 238 @Override loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory)239 public final T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory) { 240 Preconditions.checkState(directServerAddress == null, 241 "directServerAddress is set (%s), which forbids the use of LoadBalancer.Factory", 242 directServerAddress); 243 this.loadBalancerFactory = loadBalancerFactory; 244 return thisT(); 245 } 246 247 @Override enableFullStreamDecompression()248 public final T enableFullStreamDecompression() { 249 this.fullStreamDecompression = true; 250 return thisT(); 251 } 252 253 @Override decompressorRegistry(DecompressorRegistry registry)254 public final T decompressorRegistry(DecompressorRegistry registry) { 255 if (registry != null) { 256 this.decompressorRegistry = registry; 257 } else { 258 this.decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; 259 } 260 return thisT(); 261 } 262 263 @Override compressorRegistry(CompressorRegistry registry)264 public final T compressorRegistry(CompressorRegistry registry) { 265 if (registry != null) { 266 this.compressorRegistry = registry; 267 } else { 268 this.compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; 269 } 270 return thisT(); 271 } 272 273 @Override userAgent(@ullable String userAgent)274 public final T userAgent(@Nullable String userAgent) { 275 this.userAgent = userAgent; 276 return thisT(); 277 } 278 279 @Override overrideAuthority(String authority)280 public final T overrideAuthority(String authority) { 281 this.authorityOverride = checkAuthority(authority); 282 return thisT(); 283 } 284 285 @Override idleTimeout(long value, TimeUnit unit)286 public final T idleTimeout(long value, TimeUnit unit) { 287 checkArgument(value > 0, "idle timeout is %s, but must be positive", value); 288 // We convert to the largest unit to avoid overflow 289 if (unit.toDays(value) >= IDLE_MODE_MAX_TIMEOUT_DAYS) { 290 // This disables idle mode 291 this.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE; 292 } else { 293 this.idleTimeoutMillis = Math.max(unit.toMillis(value), IDLE_MODE_MIN_TIMEOUT_MILLIS); 294 } 295 return thisT(); 296 } 297 298 @Override maxRetryAttempts(int maxRetryAttempts)299 public final T maxRetryAttempts(int maxRetryAttempts) { 300 this.maxRetryAttempts = maxRetryAttempts; 301 return thisT(); 302 } 303 304 @Override maxHedgedAttempts(int maxHedgedAttempts)305 public final T maxHedgedAttempts(int maxHedgedAttempts) { 306 this.maxHedgedAttempts = maxHedgedAttempts; 307 return thisT(); 308 } 309 310 @Override retryBufferSize(long bytes)311 public final T retryBufferSize(long bytes) { 312 checkArgument(bytes > 0L, "retry buffer size must be positive"); 313 retryBufferSize = bytes; 314 return thisT(); 315 } 316 317 @Override perRpcBufferLimit(long bytes)318 public final T perRpcBufferLimit(long bytes) { 319 checkArgument(bytes > 0L, "per RPC buffer limit must be positive"); 320 perRpcBufferLimit = bytes; 321 return thisT(); 322 } 323 324 @Override disableRetry()325 public final T disableRetry() { 326 retryEnabled = false; 327 return thisT(); 328 } 329 330 @Override enableRetry()331 public final T enableRetry() { 332 retryEnabled = true; 333 return thisT(); 334 } 335 336 @Override setBinaryLog(BinaryLog binlog)337 public final T setBinaryLog(BinaryLog binlog) { 338 this.binlog = binlog; 339 return thisT(); 340 } 341 342 @Override maxTraceEvents(int maxTraceEvents)343 public T maxTraceEvents(int maxTraceEvents) { 344 checkArgument(maxTraceEvents >= 0, "maxTraceEvents must be non-negative"); 345 this.maxTraceEvents = maxTraceEvents; 346 return thisT(); 347 } 348 349 /** 350 * Override the default stats implementation. 351 */ 352 @VisibleForTesting overrideCensusStatsModule(CensusStatsModule censusStats)353 protected final T overrideCensusStatsModule(CensusStatsModule censusStats) { 354 this.censusStatsOverride = censusStats; 355 return thisT(); 356 } 357 358 /** 359 * Disable or enable stats features. Enabled by default. 360 */ setStatsEnabled(boolean value)361 protected void setStatsEnabled(boolean value) { 362 statsEnabled = value; 363 } 364 365 /** 366 * Disable or enable stats recording for RPC upstarts. Effective only if {@link 367 * #setStatsEnabled} is set to true. Enabled by default. 368 */ setStatsRecordStartedRpcs(boolean value)369 protected void setStatsRecordStartedRpcs(boolean value) { 370 recordStartedRpcs = value; 371 } 372 373 /** 374 * Disable or enable stats recording for RPC completions. Effective only if {@link 375 * #setStatsEnabled} is set to true. Enabled by default. 376 */ setStatsRecordFinishedRpcs(boolean value)377 protected void setStatsRecordFinishedRpcs(boolean value) { 378 recordFinishedRpcs = value; 379 } 380 381 /** 382 * Disable or enable tracing features. Enabled by default. 383 */ setTracingEnabled(boolean value)384 protected void setTracingEnabled(boolean value) { 385 tracingEnabled = value; 386 } 387 388 @VisibleForTesting getIdleTimeoutMillis()389 final long getIdleTimeoutMillis() { 390 return idleTimeoutMillis; 391 } 392 393 /** 394 * Verifies the authority is valid. This method exists as an escape hatch for putting in an 395 * authority that is valid, but would fail the default validation provided by this 396 * implementation. 397 */ checkAuthority(String authority)398 protected String checkAuthority(String authority) { 399 return GrpcUtil.checkAuthority(authority); 400 } 401 402 @Override build()403 public ManagedChannel build() { 404 return new ManagedChannelOrphanWrapper(new ManagedChannelImpl( 405 this, 406 buildTransportFactory(), 407 // TODO(carl-mastrangelo): Allow clients to pass this in 408 new ExponentialBackoffPolicy.Provider(), 409 SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR), 410 GrpcUtil.STOPWATCH_SUPPLIER, 411 getEffectiveInterceptors(), 412 TimeProvider.SYSTEM_TIME_PROVIDER)); 413 } 414 415 // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know 416 // what should be the desired behavior for retry + stats/tracing. 417 // TODO(zdapeng): FIX IT 418 @VisibleForTesting getEffectiveInterceptors()419 final List<ClientInterceptor> getEffectiveInterceptors() { 420 List<ClientInterceptor> effectiveInterceptors = 421 new ArrayList<>(this.interceptors); 422 temporarilyDisableRetry = false; 423 if (statsEnabled) { 424 temporarilyDisableRetry = true; 425 CensusStatsModule censusStats = this.censusStatsOverride; 426 if (censusStats == null) { 427 censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); 428 } 429 // First interceptor runs last (see ClientInterceptors.intercept()), so that no 430 // other interceptor can override the tracer factory we set in CallOptions. 431 effectiveInterceptors.add( 432 0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs)); 433 } 434 if (tracingEnabled) { 435 temporarilyDisableRetry = true; 436 CensusTracingModule censusTracing = 437 new CensusTracingModule(Tracing.getTracer(), 438 Tracing.getPropagationComponent().getBinaryFormat()); 439 effectiveInterceptors.add(0, censusTracing.getClientInterceptor()); 440 } 441 return effectiveInterceptors; 442 } 443 444 /** 445 * Subclasses should override this method to provide the {@link ClientTransportFactory} 446 * appropriate for this channel. This method is meant for Transport implementors and should not 447 * be used by normal users. 448 */ buildTransportFactory()449 protected abstract ClientTransportFactory buildTransportFactory(); 450 451 /** 452 * Subclasses can override this method to provide additional parameters to {@link 453 * NameResolver.Factory#newNameResolver}. The default implementation returns {@link 454 * Attributes#EMPTY}. 455 */ getNameResolverParams()456 protected Attributes getNameResolverParams() { 457 return Attributes.EMPTY; 458 } 459 460 /** 461 * Returns a {@link NameResolver.Factory} for the channel. 462 */ getNameResolverFactory()463 NameResolver.Factory getNameResolverFactory() { 464 if (authorityOverride == null) { 465 return nameResolverFactory; 466 } else { 467 return new OverrideAuthorityNameResolverFactory(nameResolverFactory, authorityOverride); 468 } 469 } 470 471 private static class DirectAddressNameResolverFactory extends NameResolver.Factory { 472 final SocketAddress address; 473 final String authority; 474 DirectAddressNameResolverFactory(SocketAddress address, String authority)475 DirectAddressNameResolverFactory(SocketAddress address, String authority) { 476 this.address = address; 477 this.authority = authority; 478 } 479 480 @Override newNameResolver(URI notUsedUri, Attributes params)481 public NameResolver newNameResolver(URI notUsedUri, Attributes params) { 482 return new NameResolver() { 483 @Override 484 public String getServiceAuthority() { 485 return authority; 486 } 487 488 @Override 489 public void start(final Listener listener) { 490 listener.onAddresses( 491 Collections.singletonList(new EquivalentAddressGroup(address)), 492 Attributes.EMPTY); 493 } 494 495 @Override 496 public void shutdown() {} 497 }; 498 } 499 500 @Override getDefaultScheme()501 public String getDefaultScheme() { 502 return DIRECT_ADDRESS_SCHEME; 503 } 504 } 505 506 /** 507 * Returns the correctly typed version of the builder. 508 */ 509 private T thisT() { 510 @SuppressWarnings("unchecked") 511 T thisT = (T) this; 512 return thisT; 513 } 514 } 515