1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Objects; 24 import com.google.common.base.Preconditions; 25 import com.google.common.base.Splitter; 26 import com.google.common.base.Stopwatch; 27 import com.google.common.base.Supplier; 28 import com.google.common.util.concurrent.ListenableFuture; 29 import com.google.common.util.concurrent.ThreadFactoryBuilder; 30 import io.grpc.CallOptions; 31 import io.grpc.ClientStreamTracer; 32 import io.grpc.ClientStreamTracer.StreamInfo; 33 import io.grpc.InternalChannelz.SocketStats; 34 import io.grpc.InternalLogId; 35 import io.grpc.InternalMetadata; 36 import io.grpc.InternalMetadata.TrustedAsciiMarshaller; 37 import io.grpc.LoadBalancer.PickResult; 38 import io.grpc.LoadBalancer.Subchannel; 39 import io.grpc.Metadata; 40 import io.grpc.MethodDescriptor; 41 import io.grpc.ProxiedSocketAddress; 42 import io.grpc.ProxyDetector; 43 import io.grpc.Status; 44 import io.grpc.internal.ClientStreamListener.RpcProgress; 45 import io.grpc.internal.SharedResourceHolder.Resource; 46 import io.grpc.internal.StreamListener.MessageProducer; 47 import java.io.Closeable; 48 import java.io.IOException; 49 import java.io.InputStream; 50 import java.lang.reflect.InvocationTargetException; 51 import java.lang.reflect.Method; 52 import java.net.HttpURLConnection; 53 import java.net.InetSocketAddress; 54 import java.net.SocketAddress; 55 import java.net.URI; 56 import java.net.URISyntaxException; 57 import java.nio.charset.Charset; 58 import java.util.Collection; 59 import java.util.Collections; 60 import java.util.EnumSet; 61 import java.util.List; 62 import java.util.Locale; 63 import java.util.Set; 64 import java.util.concurrent.Executor; 65 import java.util.concurrent.ExecutorService; 66 import java.util.concurrent.Executors; 67 import java.util.concurrent.ScheduledExecutorService; 68 import java.util.concurrent.ThreadFactory; 69 import java.util.concurrent.TimeUnit; 70 import java.util.logging.Level; 71 import java.util.logging.Logger; 72 import javax.annotation.Nullable; 73 import javax.annotation.concurrent.Immutable; 74 75 /** 76 * Common utilities for GRPC. 77 */ 78 public final class GrpcUtil { 79 80 private static final Logger log = Logger.getLogger(GrpcUtil.class.getName()); 81 82 private static final Set<Status.Code> INAPPROPRIATE_CONTROL_PLANE_STATUS 83 = Collections.unmodifiableSet(EnumSet.of( 84 Status.Code.OK, 85 Status.Code.INVALID_ARGUMENT, 86 Status.Code.NOT_FOUND, 87 Status.Code.ALREADY_EXISTS, 88 Status.Code.FAILED_PRECONDITION, 89 Status.Code.ABORTED, 90 Status.Code.OUT_OF_RANGE, 91 Status.Code.DATA_LOSS)); 92 93 public static final Charset US_ASCII = Charset.forName("US-ASCII"); 94 95 /** 96 * {@link io.grpc.Metadata.Key} for the timeout header. 97 */ 98 public static final Metadata.Key<Long> TIMEOUT_KEY = 99 Metadata.Key.of(GrpcUtil.TIMEOUT, new TimeoutMarshaller()); 100 101 /** 102 * {@link io.grpc.Metadata.Key} for the message encoding header. 103 */ 104 public static final Metadata.Key<String> MESSAGE_ENCODING_KEY = 105 Metadata.Key.of(GrpcUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER); 106 107 /** 108 * {@link io.grpc.Metadata.Key} for the accepted message encodings header. 109 */ 110 public static final Metadata.Key<byte[]> MESSAGE_ACCEPT_ENCODING_KEY = 111 InternalMetadata.keyOf(GrpcUtil.MESSAGE_ACCEPT_ENCODING, new AcceptEncodingMarshaller()); 112 113 /** 114 * {@link io.grpc.Metadata.Key} for the stream's content encoding header. 115 */ 116 public static final Metadata.Key<String> CONTENT_ENCODING_KEY = 117 Metadata.Key.of(GrpcUtil.CONTENT_ENCODING, Metadata.ASCII_STRING_MARSHALLER); 118 119 /** 120 * {@link io.grpc.Metadata.Key} for the stream's accepted content encoding header. 121 */ 122 public static final Metadata.Key<byte[]> CONTENT_ACCEPT_ENCODING_KEY = 123 InternalMetadata.keyOf(GrpcUtil.CONTENT_ACCEPT_ENCODING, new AcceptEncodingMarshaller()); 124 125 static final Metadata.Key<String> CONTENT_LENGTH_KEY = 126 Metadata.Key.of("content-length", Metadata.ASCII_STRING_MARSHALLER); 127 128 private static final class AcceptEncodingMarshaller implements TrustedAsciiMarshaller<byte[]> { 129 @Override toAsciiString(byte[] value)130 public byte[] toAsciiString(byte[] value) { 131 return value; 132 } 133 134 @Override parseAsciiString(byte[] serialized)135 public byte[] parseAsciiString(byte[] serialized) { 136 return serialized; 137 } 138 } 139 140 /** 141 * {@link io.grpc.Metadata.Key} for the Content-Type request/response header. 142 */ 143 public static final Metadata.Key<String> CONTENT_TYPE_KEY = 144 Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER); 145 146 /** 147 * {@link io.grpc.Metadata.Key} for the Transfer encoding. 148 */ 149 public static final Metadata.Key<String> TE_HEADER = 150 Metadata.Key.of("te", Metadata.ASCII_STRING_MARSHALLER); 151 152 /** 153 * {@link io.grpc.Metadata.Key} for the Content-Type request/response header. 154 */ 155 public static final Metadata.Key<String> USER_AGENT_KEY = 156 Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER); 157 158 /** 159 * The default port for plain-text connections. 160 */ 161 public static final int DEFAULT_PORT_PLAINTEXT = 80; 162 163 /** 164 * The default port for SSL connections. 165 */ 166 public static final int DEFAULT_PORT_SSL = 443; 167 168 /** 169 * Content-Type used for GRPC-over-HTTP/2. 170 */ 171 public static final String CONTENT_TYPE_GRPC = "application/grpc"; 172 173 /** 174 * The HTTP method used for GRPC requests. 175 */ 176 public static final String HTTP_METHOD = "POST"; 177 178 /** 179 * The TE (transport encoding) header for requests over HTTP/2. 180 */ 181 public static final String TE_TRAILERS = "trailers"; 182 183 /** 184 * The Timeout header name. 185 */ 186 public static final String TIMEOUT = "grpc-timeout"; 187 188 /** 189 * The message encoding (i.e. compression) that can be used in the stream. 190 */ 191 public static final String MESSAGE_ENCODING = "grpc-encoding"; 192 193 /** 194 * The accepted message encodings (i.e. compression) that can be used in the stream. 195 */ 196 public static final String MESSAGE_ACCEPT_ENCODING = "grpc-accept-encoding"; 197 198 /** 199 * The content-encoding used to compress the full gRPC stream. 200 */ 201 public static final String CONTENT_ENCODING = "content-encoding"; 202 203 /** 204 * The accepted content-encodings that can be used to compress the full gRPC stream. 205 */ 206 public static final String CONTENT_ACCEPT_ENCODING = "accept-encoding"; 207 208 /** 209 * The default maximum uncompressed size (in bytes) for inbound messages. Defaults to 4 MiB. 210 */ 211 public static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024; 212 213 /** 214 * The default maximum size (in bytes) for inbound header/trailer. 215 */ 216 // Update documentation in public-facing Builders when changing this value. 217 public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192; 218 219 public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults(); 220 221 private static final String IMPLEMENTATION_VERSION = "1.56.1-SNAPSHOT"; // CURRENT_GRPC_VERSION 222 223 /** 224 * The default timeout in nanos for a keepalive ping request. 225 */ 226 public static final long DEFAULT_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L); 227 228 /** 229 * The magic keepalive time value that disables client keepalive. 230 */ 231 public static final long KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE; 232 233 /** 234 * The default delay in nanos for server keepalive. 235 */ 236 public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L); 237 238 /** 239 * The default timeout in nanos for a server keepalive ping request. 240 */ 241 public static final long DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L); 242 243 /** 244 * The magic keepalive time value that disables keepalive. 245 */ 246 public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE; 247 248 /** 249 * The default proxy detector. 250 */ 251 public static final ProxyDetector DEFAULT_PROXY_DETECTOR = new ProxyDetectorImpl(); 252 253 /** 254 * A proxy detector that always claims no proxy is needed. 255 */ 256 public static final ProxyDetector NOOP_PROXY_DETECTOR = new ProxyDetector() { 257 @Nullable 258 @Override 259 public ProxiedSocketAddress proxyFor(SocketAddress targetServerAddress) { 260 return null; 261 } 262 }; 263 264 /** 265 * The very default load-balancing policy. 266 */ 267 public static final String DEFAULT_LB_POLICY = "pick_first"; 268 269 /** 270 * RPCs created on the Channel returned by {@link io.grpc.LoadBalancer.Subchannel#asChannel} 271 * will have this option with value {@code true}. They will be treated differently from 272 * the ones created by application. 273 */ 274 public static final CallOptions.Key<Boolean> CALL_OPTIONS_RPC_OWNED_BY_BALANCER = 275 CallOptions.Key.create("io.grpc.internal.CALL_OPTIONS_RPC_OWNED_BY_BALANCER"); 276 277 private static final ClientStreamTracer NOOP_TRACER = new ClientStreamTracer() {}; 278 279 /** 280 * Returns true if an RPC with the given properties should be counted when calculating the 281 * in-use state of a transport. 282 */ shouldBeCountedForInUse(CallOptions callOptions)283 public static boolean shouldBeCountedForInUse(CallOptions callOptions) { 284 return !Boolean.TRUE.equals(callOptions.getOption(CALL_OPTIONS_RPC_OWNED_BY_BALANCER)); 285 } 286 287 /** 288 * Maps HTTP error response status codes to transport codes, as defined in <a 289 * href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md"> 290 * http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is 291 * {@code true}. 292 */ httpStatusToGrpcStatus(int httpStatusCode)293 public static Status httpStatusToGrpcStatus(int httpStatusCode) { 294 return httpStatusToGrpcCode(httpStatusCode).toStatus() 295 .withDescription("HTTP status code " + httpStatusCode); 296 } 297 httpStatusToGrpcCode(int httpStatusCode)298 private static Status.Code httpStatusToGrpcCode(int httpStatusCode) { 299 if (httpStatusCode >= 100 && httpStatusCode < 200) { 300 // 1xx. These headers should have been ignored. 301 return Status.Code.INTERNAL; 302 } 303 switch (httpStatusCode) { 304 case HttpURLConnection.HTTP_BAD_REQUEST: // 400 305 case 431: // Request Header Fields Too Large 306 // TODO(carl-mastrangelo): this should be added to the http-grpc-status-mapping.md doc. 307 return Status.Code.INTERNAL; 308 case HttpURLConnection.HTTP_UNAUTHORIZED: // 401 309 return Status.Code.UNAUTHENTICATED; 310 case HttpURLConnection.HTTP_FORBIDDEN: // 403 311 return Status.Code.PERMISSION_DENIED; 312 case HttpURLConnection.HTTP_NOT_FOUND: // 404 313 return Status.Code.UNIMPLEMENTED; 314 case 429: // Too Many Requests 315 case HttpURLConnection.HTTP_BAD_GATEWAY: // 502 316 case HttpURLConnection.HTTP_UNAVAILABLE: // 503 317 case HttpURLConnection.HTTP_GATEWAY_TIMEOUT: // 504 318 return Status.Code.UNAVAILABLE; 319 default: 320 return Status.Code.UNKNOWN; 321 } 322 } 323 324 /** 325 * All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames. 326 */ 327 public enum Http2Error { 328 /** 329 * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with 330 * {@code NO_ERROR}. In this case it is important to indicate to the application that the 331 * request should be retried (i.e. {@link Status#UNAVAILABLE}). 332 */ 333 NO_ERROR(0x0, Status.UNAVAILABLE), 334 PROTOCOL_ERROR(0x1, Status.INTERNAL), 335 INTERNAL_ERROR(0x2, Status.INTERNAL), 336 FLOW_CONTROL_ERROR(0x3, Status.INTERNAL), 337 SETTINGS_TIMEOUT(0x4, Status.INTERNAL), 338 STREAM_CLOSED(0x5, Status.INTERNAL), 339 FRAME_SIZE_ERROR(0x6, Status.INTERNAL), 340 REFUSED_STREAM(0x7, Status.UNAVAILABLE), 341 CANCEL(0x8, Status.CANCELLED), 342 COMPRESSION_ERROR(0x9, Status.INTERNAL), 343 CONNECT_ERROR(0xA, Status.INTERNAL), 344 ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")), 345 INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as " 346 + "protocol is not secure enough to call")), 347 HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN); 348 349 // Populate a mapping of code to enum value for quick look-up. 350 private static final Http2Error[] codeMap = buildHttp2CodeMap(); 351 buildHttp2CodeMap()352 private static Http2Error[] buildHttp2CodeMap() { 353 Http2Error[] errors = Http2Error.values(); 354 int size = (int) errors[errors.length - 1].code() + 1; 355 Http2Error[] http2CodeMap = new Http2Error[size]; 356 for (Http2Error error : errors) { 357 int index = (int) error.code(); 358 http2CodeMap[index] = error; 359 } 360 return http2CodeMap; 361 } 362 363 private final int code; 364 // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true 365 // when there are exceptions in the Status, which is not true here. 366 @SuppressWarnings("ImmutableEnumChecker") 367 private final Status status; 368 Http2Error(int code, Status status)369 Http2Error(int code, Status status) { 370 this.code = code; 371 String description = "HTTP/2 error code: " + this.name(); 372 if (status.getDescription() != null) { 373 description += " (" + status.getDescription() + ")"; 374 } 375 this.status = status.withDescription(description); 376 } 377 378 /** 379 * Gets the code for this error used on the wire. 380 */ code()381 public long code() { 382 return code; 383 } 384 385 /** 386 * Gets the {@link Status} associated with this HTTP/2 code. 387 */ status()388 public Status status() { 389 return status; 390 } 391 392 /** 393 * Looks up the HTTP/2 error code enum value for the specified code. 394 * 395 * @param code an HTTP/2 error code value. 396 * @return the HTTP/2 error code enum or {@code null} if not found. 397 */ forCode(long code)398 public static Http2Error forCode(long code) { 399 if (code >= codeMap.length || code < 0) { 400 return null; 401 } 402 return codeMap[(int) code]; 403 } 404 405 /** 406 * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code 407 * forCode(code).status()}, to more easily conform to HTTP/2: 408 * 409 * <blockquote>Unknown or unsupported error codes MUST NOT trigger any special behavior. 410 * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.</blockquote> 411 * 412 * @param code the HTTP/2 error code. 413 * @return a {@link Status} representing the given error. 414 */ statusForCode(long code)415 public static Status statusForCode(long code) { 416 Http2Error error = forCode(code); 417 if (error == null) { 418 // This "forgets" the message of INTERNAL_ERROR while keeping the same status code. 419 Status.Code statusCode = INTERNAL_ERROR.status().getCode(); 420 return Status.fromCodeValue(statusCode.value()) 421 .withDescription("Unrecognized HTTP/2 error code: " + code); 422 } 423 424 return error.status(); 425 } 426 } 427 428 /** 429 * Indicates whether or not the given value is a valid gRPC content-type. 430 */ isGrpcContentType(String contentType)431 public static boolean isGrpcContentType(String contentType) { 432 if (contentType == null) { 433 return false; 434 } 435 436 if (CONTENT_TYPE_GRPC.length() > contentType.length()) { 437 return false; 438 } 439 440 contentType = contentType.toLowerCase(Locale.US); 441 if (!contentType.startsWith(CONTENT_TYPE_GRPC)) { 442 // Not a gRPC content-type. 443 return false; 444 } 445 446 if (contentType.length() == CONTENT_TYPE_GRPC.length()) { 447 // The strings match exactly. 448 return true; 449 } 450 451 // The contentType matches, but is longer than the expected string. 452 // We need to support variations on the content-type (e.g. +proto, +json) as defined by the 453 // gRPC wire spec. 454 char nextChar = contentType.charAt(CONTENT_TYPE_GRPC.length()); 455 return nextChar == '+' || nextChar == ';'; 456 } 457 458 /** 459 * Gets the User-Agent string for the gRPC transport. 460 */ getGrpcUserAgent( String transportName, @Nullable String applicationUserAgent)461 public static String getGrpcUserAgent( 462 String transportName, @Nullable String applicationUserAgent) { 463 StringBuilder builder = new StringBuilder(); 464 if (applicationUserAgent != null) { 465 builder.append(applicationUserAgent); 466 builder.append(' '); 467 } 468 builder.append("grpc-java-"); 469 builder.append(transportName); 470 builder.append('/'); 471 builder.append(IMPLEMENTATION_VERSION); 472 return builder.toString(); 473 } 474 475 @Immutable 476 public static final class GrpcBuildVersion { 477 private final String userAgent; 478 private final String implementationVersion; 479 GrpcBuildVersion(String userAgent, String implementationVersion)480 private GrpcBuildVersion(String userAgent, String implementationVersion) { 481 this.userAgent = Preconditions.checkNotNull(userAgent, "userAgentName"); 482 this.implementationVersion = 483 Preconditions.checkNotNull(implementationVersion, "implementationVersion"); 484 } 485 getUserAgent()486 public String getUserAgent() { 487 return userAgent; 488 } 489 getImplementationVersion()490 public String getImplementationVersion() { 491 return implementationVersion; 492 } 493 494 @Override toString()495 public String toString() { 496 return userAgent + " " + implementationVersion; 497 } 498 } 499 500 /** 501 * Returns the build version of gRPC. 502 */ getGrpcBuildVersion()503 public static GrpcBuildVersion getGrpcBuildVersion() { 504 return new GrpcBuildVersion("gRPC Java", IMPLEMENTATION_VERSION); 505 } 506 507 /** 508 * Parse an authority into a URI for retrieving the host and port. 509 */ authorityToUri(String authority)510 public static URI authorityToUri(String authority) { 511 Preconditions.checkNotNull(authority, "authority"); 512 URI uri; 513 try { 514 uri = new URI(null, authority, null, null, null); 515 } catch (URISyntaxException ex) { 516 throw new IllegalArgumentException("Invalid authority: " + authority, ex); 517 } 518 return uri; 519 } 520 521 /** 522 * Verify {@code authority} is valid for use with gRPC. The syntax must be valid and it must not 523 * include userinfo. 524 * 525 * @return the {@code authority} provided 526 */ checkAuthority(String authority)527 public static String checkAuthority(String authority) { 528 URI uri = authorityToUri(authority); 529 checkArgument(uri.getHost() != null, "No host in authority '%s'", authority); 530 checkArgument(uri.getUserInfo() == null, 531 "Userinfo must not be present on authority: '%s'", authority); 532 return authority; 533 } 534 535 /** 536 * Combine a host and port into an authority string. 537 */ 538 // There is a copy of this method in io.grpc.Grpc authorityFromHostAndPort(String host, int port)539 public static String authorityFromHostAndPort(String host, int port) { 540 try { 541 return new URI(null, null, host, port, null, null, null).getAuthority(); 542 } catch (URISyntaxException ex) { 543 throw new IllegalArgumentException("Invalid host or port: " + host + " " + port, ex); 544 } 545 } 546 547 /** 548 * Shared executor for channels. 549 */ 550 public static final Resource<Executor> SHARED_CHANNEL_EXECUTOR = 551 new Resource<Executor>() { 552 private static final String NAME = "grpc-default-executor"; 553 @Override 554 public Executor create() { 555 return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true)); 556 } 557 558 @Override 559 public void close(Executor instance) { 560 ((ExecutorService) instance).shutdown(); 561 } 562 563 @Override 564 public String toString() { 565 return NAME; 566 } 567 }; 568 569 /** 570 * Shared single-threaded executor for managing channel timers. 571 */ 572 public static final Resource<ScheduledExecutorService> TIMER_SERVICE = 573 new Resource<ScheduledExecutorService>() { 574 @Override 575 public ScheduledExecutorService create() { 576 // We don't use newSingleThreadScheduledExecutor because it doesn't return a 577 // ScheduledThreadPoolExecutor. 578 ScheduledExecutorService service = Executors.newScheduledThreadPool( 579 1, 580 getThreadFactory("grpc-timer-%d", true)); 581 582 // If there are long timeouts that are cancelled, they will not actually be removed from 583 // the executors queue. This forces immediate removal upon cancellation to avoid a 584 // memory leak. Reflection is used because we cannot use methods added in Java 1.7. If 585 // the method does not exist, we give up. Note that the method is not present in 1.6, but 586 // _is_ present in the android standard library. 587 try { 588 Method method = service.getClass().getMethod("setRemoveOnCancelPolicy", boolean.class); 589 method.invoke(service, true); 590 } catch (NoSuchMethodException e) { 591 // no op 592 } catch (RuntimeException e) { 593 throw e; 594 } catch (Exception e) { 595 throw new RuntimeException(e); 596 } 597 598 return Executors.unconfigurableScheduledExecutorService(service); 599 } 600 601 @Override 602 public void close(ScheduledExecutorService instance) { 603 instance.shutdown(); 604 } 605 }; 606 607 608 /** 609 * Get a {@link ThreadFactory} suitable for use in the current environment. 610 * @param nameFormat to apply to threads created by the factory. 611 * @param daemon {@code true} if the threads the factory creates are daemon threads, {@code false} 612 * otherwise. 613 * @return a {@link ThreadFactory}. 614 */ getThreadFactory(String nameFormat, boolean daemon)615 public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) { 616 return new ThreadFactoryBuilder() 617 .setDaemon(daemon) 618 .setNameFormat(nameFormat) 619 .build(); 620 } 621 622 /** 623 * The factory of default Stopwatches. 624 */ 625 public static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() { 626 @Override 627 public Stopwatch get() { 628 return Stopwatch.createUnstarted(); 629 } 630 }; 631 632 /** 633 * Returns the host via {@link InetSocketAddress#getHostString} if it is possible, 634 * i.e. in jdk >= 7. 635 * Otherwise, return it via {@link InetSocketAddress#getHostName} which may incur a DNS lookup. 636 */ getHost(InetSocketAddress addr)637 public static String getHost(InetSocketAddress addr) { 638 try { 639 Method getHostStringMethod = InetSocketAddress.class.getMethod("getHostString"); 640 return (String) getHostStringMethod.invoke(addr); 641 } catch (NoSuchMethodException e) { 642 // noop 643 } catch (IllegalAccessException e) { 644 // noop 645 } catch (InvocationTargetException e) { 646 // noop 647 } 648 return addr.getHostName(); 649 } 650 651 /** 652 * Marshals a nanoseconds representation of the timeout to and from a string representation, 653 * consisting of an ASCII decimal representation of a number with at most 8 digits, followed by a 654 * unit. Available units: 655 * n = nanoseconds 656 * u = microseconds 657 * m = milliseconds 658 * S = seconds 659 * M = minutes 660 * H = hours 661 * 662 * <p>The representation is greedy with respect to precision. That is, 2 seconds will be 663 * represented as `2000000u`.</p> 664 * 665 * <p>See <a href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests">the 666 * request header definition</a></p> 667 */ 668 @VisibleForTesting 669 static class TimeoutMarshaller implements Metadata.AsciiMarshaller<Long> { 670 671 @Override toAsciiString(Long timeoutNanos)672 public String toAsciiString(Long timeoutNanos) { 673 long cutoff = 100000000; 674 TimeUnit unit = TimeUnit.NANOSECONDS; 675 if (timeoutNanos < 0) { 676 throw new IllegalArgumentException("Timeout too small"); 677 } else if (timeoutNanos < cutoff) { 678 return timeoutNanos + "n"; 679 } else if (timeoutNanos < cutoff * 1000L) { 680 return unit.toMicros(timeoutNanos) + "u"; 681 } else if (timeoutNanos < cutoff * 1000L * 1000L) { 682 return unit.toMillis(timeoutNanos) + "m"; 683 } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L) { 684 return unit.toSeconds(timeoutNanos) + "S"; 685 } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L * 60L) { 686 return unit.toMinutes(timeoutNanos) + "M"; 687 } else { 688 return unit.toHours(timeoutNanos) + "H"; 689 } 690 } 691 692 @Override parseAsciiString(String serialized)693 public Long parseAsciiString(String serialized) { 694 checkArgument(serialized.length() > 0, "empty timeout"); 695 checkArgument(serialized.length() <= 9, "bad timeout format"); 696 long value = Long.parseLong(serialized.substring(0, serialized.length() - 1)); 697 char unit = serialized.charAt(serialized.length() - 1); 698 switch (unit) { 699 case 'n': 700 return value; 701 case 'u': 702 return TimeUnit.MICROSECONDS.toNanos(value); 703 case 'm': 704 return TimeUnit.MILLISECONDS.toNanos(value); 705 case 'S': 706 return TimeUnit.SECONDS.toNanos(value); 707 case 'M': 708 return TimeUnit.MINUTES.toNanos(value); 709 case 'H': 710 return TimeUnit.HOURS.toNanos(value); 711 default: 712 throw new IllegalArgumentException(String.format("Invalid timeout unit: %s", unit)); 713 } 714 } 715 } 716 717 /** 718 * Returns a transport out of a PickResult, or {@code null} if the result is "buffer". 719 */ 720 @Nullable getTransportFromPickResult(PickResult result, boolean isWaitForReady)721 static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) { 722 final ClientTransport transport; 723 Subchannel subchannel = result.getSubchannel(); 724 if (subchannel != null) { 725 transport = ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport(); 726 } else { 727 transport = null; 728 } 729 if (transport != null) { 730 final ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); 731 if (streamTracerFactory == null) { 732 return transport; 733 } 734 return new ClientTransport() { 735 @Override 736 public ClientStream newStream( 737 MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, 738 ClientStreamTracer[] tracers) { 739 StreamInfo info = StreamInfo.newBuilder().setCallOptions(callOptions).build(); 740 ClientStreamTracer streamTracer = 741 streamTracerFactory.newClientStreamTracer(info, headers); 742 checkState(tracers[tracers.length - 1] == NOOP_TRACER, "lb tracer already assigned"); 743 tracers[tracers.length - 1] = streamTracer; 744 return transport.newStream(method, headers, callOptions, tracers); 745 } 746 747 @Override 748 public void ping(PingCallback callback, Executor executor) { 749 transport.ping(callback, executor); 750 } 751 752 @Override 753 public InternalLogId getLogId() { 754 return transport.getLogId(); 755 } 756 757 @Override 758 public ListenableFuture<SocketStats> getStats() { 759 return transport.getStats(); 760 } 761 }; 762 } 763 if (!result.getStatus().isOk()) { 764 if (result.isDrop()) { 765 return new FailingClientTransport( 766 replaceInappropriateControlPlaneStatus(result.getStatus()), RpcProgress.DROPPED); 767 } 768 if (!isWaitForReady) { 769 return new FailingClientTransport( 770 replaceInappropriateControlPlaneStatus(result.getStatus()), RpcProgress.PROCESSED); 771 } 772 } 773 return null; 774 } 775 776 /** Gets stream tracers based on CallOptions. */ 777 public static ClientStreamTracer[] getClientStreamTracers( 778 CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) { 779 List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories(); 780 ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1]; 781 StreamInfo streamInfo = StreamInfo.newBuilder() 782 .setCallOptions(callOptions) 783 .setPreviousAttempts(previousAttempts) 784 .setIsTransparentRetry(isTransparentRetry) 785 .build(); 786 for (int i = 0; i < factories.size(); i++) { 787 tracers[i] = factories.get(i).newClientStreamTracer(streamInfo, headers); 788 } 789 // Reserved to be set later by the lb as per the API contract of ClientTransport.newStream(). 790 // See also GrpcUtil.getTransportFromPickResult() 791 tracers[tracers.length - 1] = NOOP_TRACER; 792 return tracers; 793 } 794 795 /** Quietly closes all messages in MessageProducer. */ 796 static void closeQuietly(MessageProducer producer) { 797 InputStream message; 798 while ((message = producer.next()) != null) { 799 closeQuietly(message); 800 } 801 } 802 803 /** 804 * Closes a Closeable, ignoring IOExceptions. 805 * This method exists because Guava's {@code Closeables.closeQuietly()} is beta. 806 */ 807 public static void closeQuietly(@Nullable Closeable message) { 808 if (message == null) { 809 return; 810 } 811 try { 812 message.close(); 813 } catch (IOException ioException) { 814 // do nothing except log 815 log.log(Level.WARNING, "exception caught in closeQuietly", ioException); 816 } 817 } 818 819 /** Reads {@code in} until end of stream. */ 820 public static void exhaust(InputStream in) throws IOException { 821 byte[] buf = new byte[256]; 822 while (in.read(buf) != -1) {} 823 } 824 825 /** 826 * Some status codes from the control plane are not appropritate to use in the data plane. If one 827 * is given it will be replaced with INTERNAL, indicating a bug in the control plane 828 * implementation. 829 */ 830 public static Status replaceInappropriateControlPlaneStatus(Status status) { 831 checkArgument(status != null); 832 return INAPPROPRIATE_CONTROL_PLANE_STATUS.contains(status.getCode()) 833 ? Status.INTERNAL.withDescription( 834 "Inappropriate status code from control plane: " + status.getCode() + " " 835 + status.getDescription()).withCause(status.getCause()) : status; 836 } 837 838 /** 839 * Checks whether the given item exists in the iterable. This is copied from Guava Collect's 840 * {@code Iterables.contains()} because Guava Collect is not Android-friendly thus core can't 841 * depend on it. 842 */ 843 static <T> boolean iterableContains(Iterable<T> iterable, T item) { 844 if (iterable instanceof Collection) { 845 Collection<?> collection = (Collection<?>) iterable; 846 try { 847 return collection.contains(item); 848 } catch (NullPointerException e) { 849 return false; 850 } catch (ClassCastException e) { 851 return false; 852 } 853 } 854 for (T i : iterable) { 855 if (Objects.equal(i, item)) { 856 return true; 857 } 858 } 859 return false; 860 } 861 862 private GrpcUtil() {} 863 } 864