1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.Objects; 23 import com.google.common.base.Preconditions; 24 import com.google.common.base.Splitter; 25 import com.google.common.base.Stopwatch; 26 import com.google.common.base.Supplier; 27 import com.google.common.util.concurrent.ListenableFuture; 28 import com.google.common.util.concurrent.MoreExecutors; 29 import com.google.common.util.concurrent.ThreadFactoryBuilder; 30 import io.grpc.CallOptions; 31 import io.grpc.ClientStreamTracer; 32 import io.grpc.InternalChannelz.SocketStats; 33 import io.grpc.InternalLogId; 34 import io.grpc.InternalMetadata; 35 import io.grpc.InternalMetadata.TrustedAsciiMarshaller; 36 import io.grpc.LoadBalancer.PickResult; 37 import io.grpc.LoadBalancer.Subchannel; 38 import io.grpc.Metadata; 39 import io.grpc.MethodDescriptor; 40 import io.grpc.Status; 41 import io.grpc.internal.ClientStreamListener.RpcProgress; 42 import io.grpc.internal.SharedResourceHolder.Resource; 43 import io.grpc.internal.StreamListener.MessageProducer; 44 import java.io.IOException; 45 import java.io.InputStream; 46 import java.lang.reflect.InvocationTargetException; 47 import java.lang.reflect.Method; 48 import java.net.HttpURLConnection; 49 import java.net.InetSocketAddress; 50 import java.net.SocketAddress; 51 import java.net.URI; 52 import java.net.URISyntaxException; 53 import java.nio.charset.Charset; 54 import java.util.Collection; 55 import java.util.concurrent.Executor; 56 import java.util.concurrent.ExecutorService; 57 import java.util.concurrent.Executors; 58 import java.util.concurrent.ScheduledExecutorService; 59 import java.util.concurrent.ThreadFactory; 60 import java.util.concurrent.TimeUnit; 61 import java.util.logging.Level; 62 import java.util.logging.Logger; 63 import javax.annotation.Nullable; 64 65 /** 66 * Common utilities for GRPC. 67 */ 68 public final class GrpcUtil { 69 70 private static final Logger log = Logger.getLogger(GrpcUtil.class.getName()); 71 72 public static final Charset US_ASCII = Charset.forName("US-ASCII"); 73 74 // AppEngine runtimes have constraints on threading and socket handling 75 // that need to be accommodated. 76 public static final boolean IS_RESTRICTED_APPENGINE = 77 System.getProperty("com.google.appengine.runtime.environment") != null 78 && "1.7".equals(System.getProperty("java.specification.version")); 79 80 /** 81 * {@link io.grpc.Metadata.Key} for the timeout header. 82 */ 83 public static final Metadata.Key<Long> TIMEOUT_KEY = 84 Metadata.Key.of(GrpcUtil.TIMEOUT, new TimeoutMarshaller()); 85 86 /** 87 * {@link io.grpc.Metadata.Key} for the message encoding header. 88 */ 89 public static final Metadata.Key<String> MESSAGE_ENCODING_KEY = 90 Metadata.Key.of(GrpcUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER); 91 92 /** 93 * {@link io.grpc.Metadata.Key} for the accepted message encodings header. 94 */ 95 public static final Metadata.Key<byte[]> MESSAGE_ACCEPT_ENCODING_KEY = 96 InternalMetadata.keyOf(GrpcUtil.MESSAGE_ACCEPT_ENCODING, new AcceptEncodingMarshaller()); 97 98 /** 99 * {@link io.grpc.Metadata.Key} for the stream's content encoding header. 100 */ 101 public static final Metadata.Key<String> CONTENT_ENCODING_KEY = 102 Metadata.Key.of(GrpcUtil.CONTENT_ENCODING, Metadata.ASCII_STRING_MARSHALLER); 103 104 /** 105 * {@link io.grpc.Metadata.Key} for the stream's accepted content encoding header. 106 */ 107 public static final Metadata.Key<byte[]> CONTENT_ACCEPT_ENCODING_KEY = 108 InternalMetadata.keyOf(GrpcUtil.CONTENT_ACCEPT_ENCODING, new AcceptEncodingMarshaller()); 109 110 private static final class AcceptEncodingMarshaller implements TrustedAsciiMarshaller<byte[]> { 111 @Override toAsciiString(byte[] value)112 public byte[] toAsciiString(byte[] value) { 113 return value; 114 } 115 116 @Override parseAsciiString(byte[] serialized)117 public byte[] parseAsciiString(byte[] serialized) { 118 return serialized; 119 } 120 } 121 122 /** 123 * {@link io.grpc.Metadata.Key} for the Content-Type request/response header. 124 */ 125 public static final Metadata.Key<String> CONTENT_TYPE_KEY = 126 Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER); 127 128 /** 129 * {@link io.grpc.Metadata.Key} for the Transfer encoding. 130 */ 131 public static final Metadata.Key<String> TE_HEADER = 132 Metadata.Key.of("te", Metadata.ASCII_STRING_MARSHALLER); 133 134 /** 135 * {@link io.grpc.Metadata.Key} for the Content-Type request/response header. 136 */ 137 public static final Metadata.Key<String> USER_AGENT_KEY = 138 Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER); 139 140 /** 141 * The default port for plain-text connections. 142 */ 143 public static final int DEFAULT_PORT_PLAINTEXT = 80; 144 145 /** 146 * The default port for SSL connections. 147 */ 148 public static final int DEFAULT_PORT_SSL = 443; 149 150 /** 151 * Content-Type used for GRPC-over-HTTP/2. 152 */ 153 public static final String CONTENT_TYPE_GRPC = "application/grpc"; 154 155 /** 156 * The HTTP method used for GRPC requests. 157 */ 158 public static final String HTTP_METHOD = "POST"; 159 160 /** 161 * The TE (transport encoding) header for requests over HTTP/2. 162 */ 163 public static final String TE_TRAILERS = "trailers"; 164 165 /** 166 * The Timeout header name. 167 */ 168 public static final String TIMEOUT = "grpc-timeout"; 169 170 /** 171 * The message encoding (i.e. compression) that can be used in the stream. 172 */ 173 public static final String MESSAGE_ENCODING = "grpc-encoding"; 174 175 /** 176 * The accepted message encodings (i.e. compression) that can be used in the stream. 177 */ 178 public static final String MESSAGE_ACCEPT_ENCODING = "grpc-accept-encoding"; 179 180 /** 181 * The content-encoding used to compress the full gRPC stream. 182 */ 183 public static final String CONTENT_ENCODING = "content-encoding"; 184 185 /** 186 * The accepted content-encodings that can be used to compress the full gRPC stream. 187 */ 188 public static final String CONTENT_ACCEPT_ENCODING = "accept-encoding"; 189 190 /** 191 * The default maximum uncompressed size (in bytes) for inbound messages. Defaults to 4 MiB. 192 */ 193 public static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024; 194 195 /** 196 * The default maximum size (in bytes) for inbound header/trailer. 197 */ 198 // Update documentation in public-facing Builders when changing this value. 199 public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192; 200 201 public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults(); 202 203 private static final String IMPLEMENTATION_VERSION = "1.16.1"; // CURRENT_GRPC_VERSION 204 205 /** 206 * The default delay in nanos before we send a keepalive. 207 */ 208 public static final long DEFAULT_KEEPALIVE_TIME_NANOS = TimeUnit.MINUTES.toNanos(1); 209 210 /** 211 * The default timeout in nanos for a keepalive ping request. 212 */ 213 public static final long DEFAULT_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L); 214 215 /** 216 * The magic keepalive time value that disables client keepalive. 217 */ 218 public static final long KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE; 219 220 /** 221 * The default delay in nanos for server keepalive. 222 */ 223 public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L); 224 225 /** 226 * The default timeout in nanos for a server keepalive ping request. 227 */ 228 public static final long DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L); 229 230 /** 231 * The magic keepalive time value that disables keepalive. 232 */ 233 public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE; 234 235 /** 236 * The default proxy detector. 237 */ 238 public static final ProxyDetector DEFAULT_PROXY_DETECTOR = new ProxyDetectorImpl(); 239 240 /** 241 * A proxy detector that always claims no proxy is needed. 242 */ 243 public static final ProxyDetector NOOP_PROXY_DETECTOR = new ProxyDetector() { 244 @Nullable 245 @Override 246 public ProxyParameters proxyFor(SocketAddress targetServerAddress) { 247 return null; 248 } 249 }; 250 251 /** 252 * Returns a proxy detector appropriate for the current environment. 253 */ getDefaultProxyDetector()254 public static ProxyDetector getDefaultProxyDetector() { 255 if (IS_RESTRICTED_APPENGINE) { 256 return NOOP_PROXY_DETECTOR; 257 } else { 258 return DEFAULT_PROXY_DETECTOR; 259 } 260 } 261 262 /** 263 * Maps HTTP error response status codes to transport codes, as defined in <a 264 * href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md"> 265 * http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is 266 * {@code true}. 267 */ httpStatusToGrpcStatus(int httpStatusCode)268 public static Status httpStatusToGrpcStatus(int httpStatusCode) { 269 return httpStatusToGrpcCode(httpStatusCode).toStatus() 270 .withDescription("HTTP status code " + httpStatusCode); 271 } 272 httpStatusToGrpcCode(int httpStatusCode)273 private static Status.Code httpStatusToGrpcCode(int httpStatusCode) { 274 if (httpStatusCode >= 100 && httpStatusCode < 200) { 275 // 1xx. These headers should have been ignored. 276 return Status.Code.INTERNAL; 277 } 278 switch (httpStatusCode) { 279 case HttpURLConnection.HTTP_BAD_REQUEST: // 400 280 case 431: // Request Header Fields Too Large 281 // TODO(carl-mastrangelo): this should be added to the http-grpc-status-mapping.md doc. 282 return Status.Code.INTERNAL; 283 case HttpURLConnection.HTTP_UNAUTHORIZED: // 401 284 return Status.Code.UNAUTHENTICATED; 285 case HttpURLConnection.HTTP_FORBIDDEN: // 403 286 return Status.Code.PERMISSION_DENIED; 287 case HttpURLConnection.HTTP_NOT_FOUND: // 404 288 return Status.Code.UNIMPLEMENTED; 289 case 429: // Too Many Requests 290 case HttpURLConnection.HTTP_BAD_GATEWAY: // 502 291 case HttpURLConnection.HTTP_UNAVAILABLE: // 503 292 case HttpURLConnection.HTTP_GATEWAY_TIMEOUT: // 504 293 return Status.Code.UNAVAILABLE; 294 default: 295 return Status.Code.UNKNOWN; 296 } 297 } 298 299 /** 300 * All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames. 301 */ 302 public enum Http2Error { 303 /** 304 * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with 305 * {@code NO_ERROR}. In this case it is important to indicate to the application that the 306 * request should be retried (i.e. {@link Status#UNAVAILABLE}). 307 */ 308 NO_ERROR(0x0, Status.UNAVAILABLE), 309 PROTOCOL_ERROR(0x1, Status.INTERNAL), 310 INTERNAL_ERROR(0x2, Status.INTERNAL), 311 FLOW_CONTROL_ERROR(0x3, Status.INTERNAL), 312 SETTINGS_TIMEOUT(0x4, Status.INTERNAL), 313 STREAM_CLOSED(0x5, Status.INTERNAL), 314 FRAME_SIZE_ERROR(0x6, Status.INTERNAL), 315 REFUSED_STREAM(0x7, Status.UNAVAILABLE), 316 CANCEL(0x8, Status.CANCELLED), 317 COMPRESSION_ERROR(0x9, Status.INTERNAL), 318 CONNECT_ERROR(0xA, Status.INTERNAL), 319 ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")), 320 INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as " 321 + "protocol is not secure enough to call")), 322 HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN); 323 324 // Populate a mapping of code to enum value for quick look-up. 325 private static final Http2Error[] codeMap = buildHttp2CodeMap(); 326 buildHttp2CodeMap()327 private static Http2Error[] buildHttp2CodeMap() { 328 Http2Error[] errors = Http2Error.values(); 329 int size = (int) errors[errors.length - 1].code() + 1; 330 Http2Error[] http2CodeMap = new Http2Error[size]; 331 for (Http2Error error : errors) { 332 int index = (int) error.code(); 333 http2CodeMap[index] = error; 334 } 335 return http2CodeMap; 336 } 337 338 private final int code; 339 // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true 340 // when there are exceptions in the Status, which is not true here. 341 @SuppressWarnings("ImmutableEnumChecker") 342 private final Status status; 343 Http2Error(int code, Status status)344 Http2Error(int code, Status status) { 345 this.code = code; 346 this.status = status.augmentDescription("HTTP/2 error code: " + this.name()); 347 } 348 349 /** 350 * Gets the code for this error used on the wire. 351 */ code()352 public long code() { 353 return code; 354 } 355 356 /** 357 * Gets the {@link Status} associated with this HTTP/2 code. 358 */ status()359 public Status status() { 360 return status; 361 } 362 363 /** 364 * Looks up the HTTP/2 error code enum value for the specified code. 365 * 366 * @param code an HTTP/2 error code value. 367 * @return the HTTP/2 error code enum or {@code null} if not found. 368 */ forCode(long code)369 public static Http2Error forCode(long code) { 370 if (code >= codeMap.length || code < 0) { 371 return null; 372 } 373 return codeMap[(int) code]; 374 } 375 376 /** 377 * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code 378 * forCode(code).status()}, to more easily conform to HTTP/2: 379 * 380 * <blockquote>Unknown or unsupported error codes MUST NOT trigger any special behavior. 381 * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.</blockquote> 382 * 383 * @param code the HTTP/2 error code. 384 * @return a {@link Status} representing the given error. 385 */ statusForCode(long code)386 public static Status statusForCode(long code) { 387 Http2Error error = forCode(code); 388 if (error == null) { 389 // This "forgets" the message of INTERNAL_ERROR while keeping the same status code. 390 Status.Code statusCode = INTERNAL_ERROR.status().getCode(); 391 return Status.fromCodeValue(statusCode.value()) 392 .withDescription("Unrecognized HTTP/2 error code: " + code); 393 } 394 395 return error.status(); 396 } 397 } 398 399 /** 400 * Indicates whether or not the given value is a valid gRPC content-type. 401 */ isGrpcContentType(String contentType)402 public static boolean isGrpcContentType(String contentType) { 403 if (contentType == null) { 404 return false; 405 } 406 407 if (CONTENT_TYPE_GRPC.length() > contentType.length()) { 408 return false; 409 } 410 411 contentType = contentType.toLowerCase(); 412 if (!contentType.startsWith(CONTENT_TYPE_GRPC)) { 413 // Not a gRPC content-type. 414 return false; 415 } 416 417 if (contentType.length() == CONTENT_TYPE_GRPC.length()) { 418 // The strings match exactly. 419 return true; 420 } 421 422 // The contentType matches, but is longer than the expected string. 423 // We need to support variations on the content-type (e.g. +proto, +json) as defined by the 424 // gRPC wire spec. 425 char nextChar = contentType.charAt(CONTENT_TYPE_GRPC.length()); 426 return nextChar == '+' || nextChar == ';'; 427 } 428 429 /** 430 * Gets the User-Agent string for the gRPC transport. 431 */ getGrpcUserAgent( String transportName, @Nullable String applicationUserAgent)432 public static String getGrpcUserAgent( 433 String transportName, @Nullable String applicationUserAgent) { 434 StringBuilder builder = new StringBuilder(); 435 if (applicationUserAgent != null) { 436 builder.append(applicationUserAgent); 437 builder.append(' '); 438 } 439 builder.append("grpc-java-"); 440 builder.append(transportName); 441 builder.append('/'); 442 builder.append(IMPLEMENTATION_VERSION); 443 return builder.toString(); 444 } 445 446 /** 447 * Parse an authority into a URI for retrieving the host and port. 448 */ authorityToUri(String authority)449 public static URI authorityToUri(String authority) { 450 Preconditions.checkNotNull(authority, "authority"); 451 URI uri; 452 try { 453 uri = new URI(null, authority, null, null, null); 454 } catch (URISyntaxException ex) { 455 throw new IllegalArgumentException("Invalid authority: " + authority, ex); 456 } 457 return uri; 458 } 459 460 /** 461 * Verify {@code authority} is valid for use with gRPC. The syntax must be valid and it must not 462 * include userinfo. 463 * 464 * @return the {@code authority} provided 465 */ checkAuthority(String authority)466 public static String checkAuthority(String authority) { 467 URI uri = authorityToUri(authority); 468 checkArgument(uri.getHost() != null, "No host in authority '%s'", authority); 469 checkArgument(uri.getUserInfo() == null, 470 "Userinfo must not be present on authority: '%s'", authority); 471 return authority; 472 } 473 474 /** 475 * Combine a host and port into an authority string. 476 */ authorityFromHostAndPort(String host, int port)477 public static String authorityFromHostAndPort(String host, int port) { 478 try { 479 return new URI(null, null, host, port, null, null, null).getAuthority(); 480 } catch (URISyntaxException ex) { 481 throw new IllegalArgumentException("Invalid host or port: " + host + " " + port, ex); 482 } 483 } 484 485 /** 486 * Shared executor for channels. 487 */ 488 public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR = 489 new Resource<ExecutorService>() { 490 private static final String NAME = "grpc-default-executor"; 491 @Override 492 public ExecutorService create() { 493 return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true)); 494 } 495 496 @Override 497 public void close(ExecutorService instance) { 498 instance.shutdown(); 499 } 500 501 @Override 502 public String toString() { 503 return NAME; 504 } 505 }; 506 507 /** 508 * Shared single-threaded executor for managing channel timers. 509 */ 510 public static final Resource<ScheduledExecutorService> TIMER_SERVICE = 511 new Resource<ScheduledExecutorService>() { 512 @Override 513 public ScheduledExecutorService create() { 514 // We don't use newSingleThreadScheduledExecutor because it doesn't return a 515 // ScheduledThreadPoolExecutor. 516 ScheduledExecutorService service = Executors.newScheduledThreadPool( 517 1, 518 getThreadFactory("grpc-timer-%d", true)); 519 520 // If there are long timeouts that are cancelled, they will not actually be removed from 521 // the executors queue. This forces immediate removal upon cancellation to avoid a 522 // memory leak. Reflection is used because we cannot use methods added in Java 1.7. If 523 // the method does not exist, we give up. Note that the method is not present in 1.6, but 524 // _is_ present in the android standard library. 525 try { 526 Method method = service.getClass().getMethod("setRemoveOnCancelPolicy", boolean.class); 527 method.invoke(service, true); 528 } catch (NoSuchMethodException e) { 529 // no op 530 } catch (RuntimeException e) { 531 throw e; 532 } catch (Exception e) { 533 throw new RuntimeException(e); 534 } 535 536 return service; 537 } 538 539 @Override 540 public void close(ScheduledExecutorService instance) { 541 instance.shutdown(); 542 } 543 }; 544 545 546 /** 547 * Get a {@link ThreadFactory} suitable for use in the current environment. 548 * @param nameFormat to apply to threads created by the factory. 549 * @param daemon {@code true} if the threads the factory creates are daemon threads, {@code false} 550 * otherwise. 551 * @return a {@link ThreadFactory}. 552 */ getThreadFactory(String nameFormat, boolean daemon)553 public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) { 554 if (IS_RESTRICTED_APPENGINE) { 555 @SuppressWarnings("BetaApi") 556 ThreadFactory factory = MoreExecutors.platformThreadFactory(); 557 return factory; 558 } else { 559 return new ThreadFactoryBuilder() 560 .setDaemon(daemon) 561 .setNameFormat(nameFormat) 562 .build(); 563 } 564 } 565 566 /** 567 * The factory of default Stopwatches. 568 */ 569 public static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() { 570 @Override 571 public Stopwatch get() { 572 return Stopwatch.createUnstarted(); 573 } 574 }; 575 576 /** 577 * Returns the host via {@link InetSocketAddress#getHostString} if it is possible, 578 * i.e. in jdk >= 7. 579 * Otherwise, return it via {@link InetSocketAddress#getHostName} which may incur a DNS lookup. 580 */ getHost(InetSocketAddress addr)581 public static String getHost(InetSocketAddress addr) { 582 try { 583 Method getHostStringMethod = InetSocketAddress.class.getMethod("getHostString"); 584 return (String) getHostStringMethod.invoke(addr); 585 } catch (NoSuchMethodException e) { 586 // noop 587 } catch (IllegalAccessException e) { 588 // noop 589 } catch (InvocationTargetException e) { 590 // noop 591 } 592 return addr.getHostName(); 593 } 594 595 /** 596 * Marshals a nanoseconds representation of the timeout to and from a string representation, 597 * consisting of an ASCII decimal representation of a number with at most 8 digits, followed by a 598 * unit: 599 * n = nanoseconds 600 * u = microseconds 601 * m = milliseconds 602 * S = seconds 603 * M = minutes 604 * H = hours 605 * 606 * <p>The representation is greedy with respect to precision. That is, 2 seconds will be 607 * represented as `2000000u`.</p> 608 * 609 * <p>See <a href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests">the 610 * request header definition</a></p> 611 */ 612 @VisibleForTesting 613 static class TimeoutMarshaller implements Metadata.AsciiMarshaller<Long> { 614 615 @Override toAsciiString(Long timeoutNanos)616 public String toAsciiString(Long timeoutNanos) { 617 long cutoff = 100000000; 618 TimeUnit unit = TimeUnit.NANOSECONDS; 619 if (timeoutNanos < 0) { 620 throw new IllegalArgumentException("Timeout too small"); 621 } else if (timeoutNanos < cutoff) { 622 return timeoutNanos + "n"; 623 } else if (timeoutNanos < cutoff * 1000L) { 624 return unit.toMicros(timeoutNanos) + "u"; 625 } else if (timeoutNanos < cutoff * 1000L * 1000L) { 626 return unit.toMillis(timeoutNanos) + "m"; 627 } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L) { 628 return unit.toSeconds(timeoutNanos) + "S"; 629 } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L * 60L) { 630 return unit.toMinutes(timeoutNanos) + "M"; 631 } else { 632 return unit.toHours(timeoutNanos) + "H"; 633 } 634 } 635 636 @Override parseAsciiString(String serialized)637 public Long parseAsciiString(String serialized) { 638 checkArgument(serialized.length() > 0, "empty timeout"); 639 checkArgument(serialized.length() <= 9, "bad timeout format"); 640 long value = Long.parseLong(serialized.substring(0, serialized.length() - 1)); 641 char unit = serialized.charAt(serialized.length() - 1); 642 switch (unit) { 643 case 'n': 644 return value; 645 case 'u': 646 return TimeUnit.MICROSECONDS.toNanos(value); 647 case 'm': 648 return TimeUnit.MILLISECONDS.toNanos(value); 649 case 'S': 650 return TimeUnit.SECONDS.toNanos(value); 651 case 'M': 652 return TimeUnit.MINUTES.toNanos(value); 653 case 'H': 654 return TimeUnit.HOURS.toNanos(value); 655 default: 656 throw new IllegalArgumentException(String.format("Invalid timeout unit: %s", unit)); 657 } 658 } 659 } 660 661 /** 662 * Returns a transport out of a PickResult, or {@code null} if the result is "buffer". 663 */ 664 @Nullable getTransportFromPickResult(PickResult result, boolean isWaitForReady)665 static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) { 666 final ClientTransport transport; 667 Subchannel subchannel = result.getSubchannel(); 668 if (subchannel != null) { 669 transport = ((AbstractSubchannel) subchannel).obtainActiveTransport(); 670 } else { 671 transport = null; 672 } 673 if (transport != null) { 674 final ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); 675 if (streamTracerFactory == null) { 676 return transport; 677 } 678 return new ClientTransport() { 679 @Override 680 public ClientStream newStream( 681 MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) { 682 return transport.newStream( 683 method, headers, callOptions.withStreamTracerFactory(streamTracerFactory)); 684 } 685 686 @Override 687 public void ping(PingCallback callback, Executor executor) { 688 transport.ping(callback, executor); 689 } 690 691 @Override 692 public InternalLogId getLogId() { 693 return transport.getLogId(); 694 } 695 696 @Override 697 public ListenableFuture<SocketStats> getStats() { 698 return transport.getStats(); 699 } 700 }; 701 } 702 if (!result.getStatus().isOk()) { 703 if (result.isDrop()) { 704 return new FailingClientTransport(result.getStatus(), RpcProgress.DROPPED); 705 } 706 if (!isWaitForReady) { 707 return new FailingClientTransport(result.getStatus(), RpcProgress.PROCESSED); 708 } 709 } 710 return null; 711 } 712 713 /** Quietly closes all messages in MessageProducer. */ 714 static void closeQuietly(MessageProducer producer) { 715 InputStream message; 716 while ((message = producer.next()) != null) { 717 closeQuietly(message); 718 } 719 } 720 721 /** 722 * Closes an InputStream, ignoring IOExceptions. 723 * This method exists because Guava's {@code Closeables.closeQuietly()} is beta. 724 */ 725 public static void closeQuietly(@Nullable InputStream message) { 726 if (message == null) { 727 return; 728 } 729 try { 730 message.close(); 731 } catch (IOException ioException) { 732 // do nothing except log 733 log.log(Level.WARNING, "exception caught in closeQuietly", ioException); 734 } 735 } 736 737 /** 738 * Checks whether the given item exists in the iterable. This is copied from Guava Collect's 739 * {@code Iterables.contains()} because Guava Collect is not Android-friendly thus core can't 740 * depend on it. 741 */ 742 static <T> boolean iterableContains(Iterable<T> iterable, T item) { 743 if (iterable instanceof Collection) { 744 Collection<?> collection = (Collection<?>) iterable; 745 try { 746 return collection.contains(item); 747 } catch (NullPointerException e) { 748 return false; 749 } catch (ClassCastException e) { 750 return false; 751 } 752 } 753 for (T i : iterable) { 754 if (Objects.equal(i, item)) { 755 return true; 756 } 757 } 758 return false; 759 } 760 761 private GrpcUtil() {} 762 } 763