• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Objects;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Splitter;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.base.Supplier;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
30 import io.grpc.CallOptions;
31 import io.grpc.ClientStreamTracer;
32 import io.grpc.InternalChannelz.SocketStats;
33 import io.grpc.InternalLogId;
34 import io.grpc.InternalMetadata;
35 import io.grpc.InternalMetadata.TrustedAsciiMarshaller;
36 import io.grpc.LoadBalancer.PickResult;
37 import io.grpc.LoadBalancer.Subchannel;
38 import io.grpc.Metadata;
39 import io.grpc.MethodDescriptor;
40 import io.grpc.Status;
41 import io.grpc.internal.ClientStreamListener.RpcProgress;
42 import io.grpc.internal.SharedResourceHolder.Resource;
43 import io.grpc.internal.StreamListener.MessageProducer;
44 import java.io.IOException;
45 import java.io.InputStream;
46 import java.lang.reflect.InvocationTargetException;
47 import java.lang.reflect.Method;
48 import java.net.HttpURLConnection;
49 import java.net.InetSocketAddress;
50 import java.net.SocketAddress;
51 import java.net.URI;
52 import java.net.URISyntaxException;
53 import java.nio.charset.Charset;
54 import java.util.Collection;
55 import java.util.concurrent.Executor;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.ScheduledExecutorService;
59 import java.util.concurrent.ThreadFactory;
60 import java.util.concurrent.TimeUnit;
61 import java.util.logging.Level;
62 import java.util.logging.Logger;
63 import javax.annotation.Nullable;
64 
65 /**
66  * Common utilities for GRPC.
67  */
68 public final class GrpcUtil {
69 
70   private static final Logger log = Logger.getLogger(GrpcUtil.class.getName());
71 
72   public static final Charset US_ASCII = Charset.forName("US-ASCII");
73 
74   // AppEngine runtimes have constraints on threading and socket handling
75   // that need to be accommodated.
76   public static final boolean IS_RESTRICTED_APPENGINE =
77       System.getProperty("com.google.appengine.runtime.environment") != null
78           && "1.7".equals(System.getProperty("java.specification.version"));
79 
80   /**
81    * {@link io.grpc.Metadata.Key} for the timeout header.
82    */
83   public static final Metadata.Key<Long> TIMEOUT_KEY =
84           Metadata.Key.of(GrpcUtil.TIMEOUT, new TimeoutMarshaller());
85 
86   /**
87    * {@link io.grpc.Metadata.Key} for the message encoding header.
88    */
89   public static final Metadata.Key<String> MESSAGE_ENCODING_KEY =
90           Metadata.Key.of(GrpcUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER);
91 
92   /**
93    * {@link io.grpc.Metadata.Key} for the accepted message encodings header.
94    */
95   public static final Metadata.Key<byte[]> MESSAGE_ACCEPT_ENCODING_KEY =
96       InternalMetadata.keyOf(GrpcUtil.MESSAGE_ACCEPT_ENCODING, new AcceptEncodingMarshaller());
97 
98   /**
99    * {@link io.grpc.Metadata.Key} for the stream's content encoding header.
100    */
101   public static final Metadata.Key<String> CONTENT_ENCODING_KEY =
102       Metadata.Key.of(GrpcUtil.CONTENT_ENCODING, Metadata.ASCII_STRING_MARSHALLER);
103 
104   /**
105    * {@link io.grpc.Metadata.Key} for the stream's accepted content encoding header.
106    */
107   public static final Metadata.Key<byte[]> CONTENT_ACCEPT_ENCODING_KEY =
108       InternalMetadata.keyOf(GrpcUtil.CONTENT_ACCEPT_ENCODING, new AcceptEncodingMarshaller());
109 
110   private static final class AcceptEncodingMarshaller implements TrustedAsciiMarshaller<byte[]> {
111     @Override
toAsciiString(byte[] value)112     public byte[] toAsciiString(byte[] value) {
113       return value;
114     }
115 
116     @Override
parseAsciiString(byte[] serialized)117     public byte[] parseAsciiString(byte[] serialized) {
118       return serialized;
119     }
120   }
121 
122   /**
123    * {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
124    */
125   public static final Metadata.Key<String> CONTENT_TYPE_KEY =
126           Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER);
127 
128   /**
129    * {@link io.grpc.Metadata.Key} for the Transfer encoding.
130    */
131   public static final Metadata.Key<String> TE_HEADER =
132       Metadata.Key.of("te", Metadata.ASCII_STRING_MARSHALLER);
133 
134   /**
135    * {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
136    */
137   public static final Metadata.Key<String> USER_AGENT_KEY =
138           Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER);
139 
140   /**
141    * The default port for plain-text connections.
142    */
143   public static final int DEFAULT_PORT_PLAINTEXT = 80;
144 
145   /**
146    * The default port for SSL connections.
147    */
148   public static final int DEFAULT_PORT_SSL = 443;
149 
150   /**
151    * Content-Type used for GRPC-over-HTTP/2.
152    */
153   public static final String CONTENT_TYPE_GRPC = "application/grpc";
154 
155   /**
156    * The HTTP method used for GRPC requests.
157    */
158   public static final String HTTP_METHOD = "POST";
159 
160   /**
161    * The TE (transport encoding) header for requests over HTTP/2.
162    */
163   public static final String TE_TRAILERS = "trailers";
164 
165   /**
166    * The Timeout header name.
167    */
168   public static final String TIMEOUT = "grpc-timeout";
169 
170   /**
171    * The message encoding (i.e. compression) that can be used in the stream.
172    */
173   public static final String MESSAGE_ENCODING = "grpc-encoding";
174 
175   /**
176    * The accepted message encodings (i.e. compression) that can be used in the stream.
177    */
178   public static final String MESSAGE_ACCEPT_ENCODING = "grpc-accept-encoding";
179 
180   /**
181    * The content-encoding used to compress the full gRPC stream.
182    */
183   public static final String CONTENT_ENCODING = "content-encoding";
184 
185   /**
186    * The accepted content-encodings that can be used to compress the full gRPC stream.
187    */
188   public static final String CONTENT_ACCEPT_ENCODING = "accept-encoding";
189 
190   /**
191    * The default maximum uncompressed size (in bytes) for inbound messages. Defaults to 4 MiB.
192    */
193   public static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024;
194 
195   /**
196    * The default maximum size (in bytes) for inbound header/trailer.
197    */
198   // Update documentation in public-facing Builders when changing this value.
199   public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192;
200 
201   public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults();
202 
203   private static final String IMPLEMENTATION_VERSION = "1.16.1"; // CURRENT_GRPC_VERSION
204 
205   /**
206    * The default delay in nanos before we send a keepalive.
207    */
208   public static final long DEFAULT_KEEPALIVE_TIME_NANOS = TimeUnit.MINUTES.toNanos(1);
209 
210   /**
211    * The default timeout in nanos for a keepalive ping request.
212    */
213   public static final long DEFAULT_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L);
214 
215   /**
216    * The magic keepalive time value that disables client keepalive.
217    */
218   public static final long KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;
219 
220   /**
221    * The default delay in nanos for server keepalive.
222    */
223   public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
224 
225   /**
226    * The default timeout in nanos for a server keepalive ping request.
227    */
228   public static final long DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L);
229 
230   /**
231    * The magic keepalive time value that disables keepalive.
232    */
233   public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;
234 
235   /**
236    * The default proxy detector.
237    */
238   public static final ProxyDetector DEFAULT_PROXY_DETECTOR = new ProxyDetectorImpl();
239 
240   /**
241    * A proxy detector that always claims no proxy is needed.
242    */
243   public static final ProxyDetector NOOP_PROXY_DETECTOR = new ProxyDetector() {
244     @Nullable
245     @Override
246     public ProxyParameters proxyFor(SocketAddress targetServerAddress) {
247       return null;
248     }
249   };
250 
251   /**
252    * Returns a proxy detector appropriate for the current environment.
253    */
getDefaultProxyDetector()254   public static ProxyDetector getDefaultProxyDetector() {
255     if (IS_RESTRICTED_APPENGINE) {
256       return NOOP_PROXY_DETECTOR;
257     } else {
258       return DEFAULT_PROXY_DETECTOR;
259     }
260   }
261 
262   /**
263    * Maps HTTP error response status codes to transport codes, as defined in <a
264    * href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md">
265    * http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is
266    * {@code true}.
267    */
httpStatusToGrpcStatus(int httpStatusCode)268   public static Status httpStatusToGrpcStatus(int httpStatusCode) {
269     return httpStatusToGrpcCode(httpStatusCode).toStatus()
270         .withDescription("HTTP status code " + httpStatusCode);
271   }
272 
httpStatusToGrpcCode(int httpStatusCode)273   private static Status.Code httpStatusToGrpcCode(int httpStatusCode) {
274     if (httpStatusCode >= 100 && httpStatusCode < 200) {
275       // 1xx. These headers should have been ignored.
276       return Status.Code.INTERNAL;
277     }
278     switch (httpStatusCode) {
279       case HttpURLConnection.HTTP_BAD_REQUEST:  // 400
280       case 431: // Request Header Fields Too Large
281         // TODO(carl-mastrangelo): this should be added to the http-grpc-status-mapping.md doc.
282         return Status.Code.INTERNAL;
283       case HttpURLConnection.HTTP_UNAUTHORIZED:  // 401
284         return Status.Code.UNAUTHENTICATED;
285       case HttpURLConnection.HTTP_FORBIDDEN:  // 403
286         return Status.Code.PERMISSION_DENIED;
287       case HttpURLConnection.HTTP_NOT_FOUND:  // 404
288         return Status.Code.UNIMPLEMENTED;
289       case 429:  // Too Many Requests
290       case HttpURLConnection.HTTP_BAD_GATEWAY:  // 502
291       case HttpURLConnection.HTTP_UNAVAILABLE:  // 503
292       case HttpURLConnection.HTTP_GATEWAY_TIMEOUT:  // 504
293         return Status.Code.UNAVAILABLE;
294       default:
295         return Status.Code.UNKNOWN;
296     }
297   }
298 
299   /**
300    * All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames.
301    */
302   public enum Http2Error {
303     /**
304      * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with
305      * {@code NO_ERROR}. In this case it is important to indicate to the application that the
306      * request should be retried (i.e. {@link Status#UNAVAILABLE}).
307      */
308     NO_ERROR(0x0, Status.UNAVAILABLE),
309     PROTOCOL_ERROR(0x1, Status.INTERNAL),
310     INTERNAL_ERROR(0x2, Status.INTERNAL),
311     FLOW_CONTROL_ERROR(0x3, Status.INTERNAL),
312     SETTINGS_TIMEOUT(0x4, Status.INTERNAL),
313     STREAM_CLOSED(0x5, Status.INTERNAL),
314     FRAME_SIZE_ERROR(0x6, Status.INTERNAL),
315     REFUSED_STREAM(0x7, Status.UNAVAILABLE),
316     CANCEL(0x8, Status.CANCELLED),
317     COMPRESSION_ERROR(0x9, Status.INTERNAL),
318     CONNECT_ERROR(0xA, Status.INTERNAL),
319     ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")),
320     INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as "
321         + "protocol is not secure enough to call")),
322     HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN);
323 
324     // Populate a mapping of code to enum value for quick look-up.
325     private static final Http2Error[] codeMap = buildHttp2CodeMap();
326 
buildHttp2CodeMap()327     private static Http2Error[] buildHttp2CodeMap() {
328       Http2Error[] errors = Http2Error.values();
329       int size = (int) errors[errors.length - 1].code() + 1;
330       Http2Error[] http2CodeMap = new Http2Error[size];
331       for (Http2Error error : errors) {
332         int index = (int) error.code();
333         http2CodeMap[index] = error;
334       }
335       return http2CodeMap;
336     }
337 
338     private final int code;
339     // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true
340     // when there are exceptions in the Status, which is not true here.
341     @SuppressWarnings("ImmutableEnumChecker")
342     private final Status status;
343 
Http2Error(int code, Status status)344     Http2Error(int code, Status status) {
345       this.code = code;
346       this.status = status.augmentDescription("HTTP/2 error code: " + this.name());
347     }
348 
349     /**
350      * Gets the code for this error used on the wire.
351      */
code()352     public long code() {
353       return code;
354     }
355 
356     /**
357      * Gets the {@link Status} associated with this HTTP/2 code.
358      */
status()359     public Status status() {
360       return status;
361     }
362 
363     /**
364      * Looks up the HTTP/2 error code enum value for the specified code.
365      *
366      * @param code an HTTP/2 error code value.
367      * @return the HTTP/2 error code enum or {@code null} if not found.
368      */
forCode(long code)369     public static Http2Error forCode(long code) {
370       if (code >= codeMap.length || code < 0) {
371         return null;
372       }
373       return codeMap[(int) code];
374     }
375 
376     /**
377      * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code
378      * forCode(code).status()}, to more easily conform to HTTP/2:
379      *
380      * <blockquote>Unknown or unsupported error codes MUST NOT trigger any special behavior.
381      * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.</blockquote>
382      *
383      * @param code the HTTP/2 error code.
384      * @return a {@link Status} representing the given error.
385      */
statusForCode(long code)386     public static Status statusForCode(long code) {
387       Http2Error error = forCode(code);
388       if (error == null) {
389         // This "forgets" the message of INTERNAL_ERROR while keeping the same status code.
390         Status.Code statusCode = INTERNAL_ERROR.status().getCode();
391         return Status.fromCodeValue(statusCode.value())
392             .withDescription("Unrecognized HTTP/2 error code: " + code);
393       }
394 
395       return error.status();
396     }
397   }
398 
399   /**
400    * Indicates whether or not the given value is a valid gRPC content-type.
401    */
isGrpcContentType(String contentType)402   public static boolean isGrpcContentType(String contentType) {
403     if (contentType == null) {
404       return false;
405     }
406 
407     if (CONTENT_TYPE_GRPC.length() > contentType.length()) {
408       return false;
409     }
410 
411     contentType = contentType.toLowerCase();
412     if (!contentType.startsWith(CONTENT_TYPE_GRPC)) {
413       // Not a gRPC content-type.
414       return false;
415     }
416 
417     if (contentType.length() == CONTENT_TYPE_GRPC.length()) {
418       // The strings match exactly.
419       return true;
420     }
421 
422     // The contentType matches, but is longer than the expected string.
423     // We need to support variations on the content-type (e.g. +proto, +json) as defined by the
424     // gRPC wire spec.
425     char nextChar = contentType.charAt(CONTENT_TYPE_GRPC.length());
426     return nextChar == '+' || nextChar == ';';
427   }
428 
429   /**
430    * Gets the User-Agent string for the gRPC transport.
431    */
getGrpcUserAgent( String transportName, @Nullable String applicationUserAgent)432   public static String getGrpcUserAgent(
433       String transportName, @Nullable String applicationUserAgent) {
434     StringBuilder builder = new StringBuilder();
435     if (applicationUserAgent != null) {
436       builder.append(applicationUserAgent);
437       builder.append(' ');
438     }
439     builder.append("grpc-java-");
440     builder.append(transportName);
441     builder.append('/');
442     builder.append(IMPLEMENTATION_VERSION);
443     return builder.toString();
444   }
445 
446   /**
447    * Parse an authority into a URI for retrieving the host and port.
448    */
authorityToUri(String authority)449   public static URI authorityToUri(String authority) {
450     Preconditions.checkNotNull(authority, "authority");
451     URI uri;
452     try {
453       uri = new URI(null, authority, null, null, null);
454     } catch (URISyntaxException ex) {
455       throw new IllegalArgumentException("Invalid authority: " + authority, ex);
456     }
457     return uri;
458   }
459 
460   /**
461    * Verify {@code authority} is valid for use with gRPC. The syntax must be valid and it must not
462    * include userinfo.
463    *
464    * @return the {@code authority} provided
465    */
checkAuthority(String authority)466   public static String checkAuthority(String authority) {
467     URI uri = authorityToUri(authority);
468     checkArgument(uri.getHost() != null, "No host in authority '%s'", authority);
469     checkArgument(uri.getUserInfo() == null,
470         "Userinfo must not be present on authority: '%s'", authority);
471     return authority;
472   }
473 
474   /**
475    * Combine a host and port into an authority string.
476    */
authorityFromHostAndPort(String host, int port)477   public static String authorityFromHostAndPort(String host, int port) {
478     try {
479       return new URI(null, null, host, port, null, null, null).getAuthority();
480     } catch (URISyntaxException ex) {
481       throw new IllegalArgumentException("Invalid host or port: " + host + " " + port, ex);
482     }
483   }
484 
485   /**
486    * Shared executor for channels.
487    */
488   public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
489       new Resource<ExecutorService>() {
490         private static final String NAME = "grpc-default-executor";
491         @Override
492         public ExecutorService create() {
493           return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
494         }
495 
496         @Override
497         public void close(ExecutorService instance) {
498           instance.shutdown();
499         }
500 
501         @Override
502         public String toString() {
503           return NAME;
504         }
505       };
506 
507   /**
508    * Shared single-threaded executor for managing channel timers.
509    */
510   public static final Resource<ScheduledExecutorService> TIMER_SERVICE =
511       new Resource<ScheduledExecutorService>() {
512         @Override
513         public ScheduledExecutorService create() {
514           // We don't use newSingleThreadScheduledExecutor because it doesn't return a
515           // ScheduledThreadPoolExecutor.
516           ScheduledExecutorService service = Executors.newScheduledThreadPool(
517               1,
518               getThreadFactory("grpc-timer-%d", true));
519 
520           // If there are long timeouts that are cancelled, they will not actually be removed from
521           // the executors queue.  This forces immediate removal upon cancellation to avoid a
522           // memory leak.  Reflection is used because we cannot use methods added in Java 1.7.  If
523           // the method does not exist, we give up.  Note that the method is not present in 1.6, but
524           // _is_ present in the android standard library.
525           try {
526             Method method = service.getClass().getMethod("setRemoveOnCancelPolicy", boolean.class);
527             method.invoke(service, true);
528           } catch (NoSuchMethodException e) {
529             // no op
530           } catch (RuntimeException e) {
531             throw e;
532           } catch (Exception e) {
533             throw new RuntimeException(e);
534           }
535 
536           return service;
537         }
538 
539         @Override
540         public void close(ScheduledExecutorService instance) {
541           instance.shutdown();
542         }
543       };
544 
545 
546   /**
547    * Get a {@link ThreadFactory} suitable for use in the current environment.
548    * @param nameFormat to apply to threads created by the factory.
549    * @param daemon {@code true} if the threads the factory creates are daemon threads, {@code false}
550    *     otherwise.
551    * @return a {@link ThreadFactory}.
552    */
getThreadFactory(String nameFormat, boolean daemon)553   public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) {
554     if (IS_RESTRICTED_APPENGINE) {
555       @SuppressWarnings("BetaApi")
556       ThreadFactory factory = MoreExecutors.platformThreadFactory();
557       return factory;
558     } else {
559       return new ThreadFactoryBuilder()
560           .setDaemon(daemon)
561           .setNameFormat(nameFormat)
562           .build();
563     }
564   }
565 
566   /**
567    * The factory of default Stopwatches.
568    */
569   public static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() {
570       @Override
571       public Stopwatch get() {
572         return Stopwatch.createUnstarted();
573       }
574     };
575 
576   /**
577    * Returns the host via {@link InetSocketAddress#getHostString} if it is possible,
578    * i.e. in jdk >= 7.
579    * Otherwise, return it via {@link InetSocketAddress#getHostName} which may incur a DNS lookup.
580    */
getHost(InetSocketAddress addr)581   public static String getHost(InetSocketAddress addr) {
582     try {
583       Method getHostStringMethod = InetSocketAddress.class.getMethod("getHostString");
584       return (String) getHostStringMethod.invoke(addr);
585     } catch (NoSuchMethodException e) {
586       // noop
587     } catch (IllegalAccessException e) {
588       // noop
589     } catch (InvocationTargetException e) {
590       // noop
591     }
592     return addr.getHostName();
593   }
594 
595   /**
596    * Marshals a nanoseconds representation of the timeout to and from a string representation,
597    * consisting of an ASCII decimal representation of a number with at most 8 digits, followed by a
598    * unit:
599    * n = nanoseconds
600    * u = microseconds
601    * m = milliseconds
602    * S = seconds
603    * M = minutes
604    * H = hours
605    *
606    * <p>The representation is greedy with respect to precision. That is, 2 seconds will be
607    * represented as `2000000u`.</p>
608    *
609    * <p>See <a href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests">the
610    * request header definition</a></p>
611    */
612   @VisibleForTesting
613   static class TimeoutMarshaller implements Metadata.AsciiMarshaller<Long> {
614 
615     @Override
toAsciiString(Long timeoutNanos)616     public String toAsciiString(Long timeoutNanos) {
617       long cutoff = 100000000;
618       TimeUnit unit = TimeUnit.NANOSECONDS;
619       if (timeoutNanos < 0) {
620         throw new IllegalArgumentException("Timeout too small");
621       } else if (timeoutNanos < cutoff) {
622         return timeoutNanos + "n";
623       } else if (timeoutNanos < cutoff * 1000L) {
624         return unit.toMicros(timeoutNanos) + "u";
625       } else if (timeoutNanos < cutoff * 1000L * 1000L) {
626         return unit.toMillis(timeoutNanos) + "m";
627       } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L) {
628         return unit.toSeconds(timeoutNanos) + "S";
629       } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L * 60L) {
630         return unit.toMinutes(timeoutNanos) + "M";
631       } else {
632         return unit.toHours(timeoutNanos) + "H";
633       }
634     }
635 
636     @Override
parseAsciiString(String serialized)637     public Long parseAsciiString(String serialized) {
638       checkArgument(serialized.length() > 0, "empty timeout");
639       checkArgument(serialized.length() <= 9, "bad timeout format");
640       long value = Long.parseLong(serialized.substring(0, serialized.length() - 1));
641       char unit = serialized.charAt(serialized.length() - 1);
642       switch (unit) {
643         case 'n':
644           return value;
645         case 'u':
646           return TimeUnit.MICROSECONDS.toNanos(value);
647         case 'm':
648           return TimeUnit.MILLISECONDS.toNanos(value);
649         case 'S':
650           return TimeUnit.SECONDS.toNanos(value);
651         case 'M':
652           return TimeUnit.MINUTES.toNanos(value);
653         case 'H':
654           return TimeUnit.HOURS.toNanos(value);
655         default:
656           throw new IllegalArgumentException(String.format("Invalid timeout unit: %s", unit));
657       }
658     }
659   }
660 
661   /**
662    * Returns a transport out of a PickResult, or {@code null} if the result is "buffer".
663    */
664   @Nullable
getTransportFromPickResult(PickResult result, boolean isWaitForReady)665   static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) {
666     final ClientTransport transport;
667     Subchannel subchannel = result.getSubchannel();
668     if (subchannel != null) {
669       transport = ((AbstractSubchannel) subchannel).obtainActiveTransport();
670     } else {
671       transport = null;
672     }
673     if (transport != null) {
674       final ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
675       if (streamTracerFactory == null) {
676         return transport;
677       }
678       return new ClientTransport() {
679         @Override
680         public ClientStream newStream(
681             MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
682           return transport.newStream(
683               method, headers, callOptions.withStreamTracerFactory(streamTracerFactory));
684         }
685 
686         @Override
687         public void ping(PingCallback callback, Executor executor) {
688           transport.ping(callback, executor);
689         }
690 
691         @Override
692         public InternalLogId getLogId() {
693           return transport.getLogId();
694         }
695 
696         @Override
697         public ListenableFuture<SocketStats> getStats() {
698           return transport.getStats();
699         }
700       };
701     }
702     if (!result.getStatus().isOk()) {
703       if (result.isDrop()) {
704         return new FailingClientTransport(result.getStatus(), RpcProgress.DROPPED);
705       }
706       if (!isWaitForReady) {
707         return new FailingClientTransport(result.getStatus(), RpcProgress.PROCESSED);
708       }
709     }
710     return null;
711   }
712 
713   /** Quietly closes all messages in MessageProducer. */
714   static void closeQuietly(MessageProducer producer) {
715     InputStream message;
716     while ((message = producer.next()) != null) {
717       closeQuietly(message);
718     }
719   }
720 
721   /**
722    * Closes an InputStream, ignoring IOExceptions.
723    * This method exists because Guava's {@code Closeables.closeQuietly()} is beta.
724    */
725   public static void closeQuietly(@Nullable InputStream message) {
726     if (message == null) {
727       return;
728     }
729     try {
730       message.close();
731     } catch (IOException ioException) {
732       // do nothing except log
733       log.log(Level.WARNING, "exception caught in closeQuietly", ioException);
734     }
735   }
736 
737   /**
738    * Checks whether the given item exists in the iterable.  This is copied from Guava Collect's
739    * {@code Iterables.contains()} because Guava Collect is not Android-friendly thus core can't
740    * depend on it.
741    */
742   static <T> boolean iterableContains(Iterable<T> iterable, T item) {
743     if (iterable instanceof Collection) {
744       Collection<?> collection = (Collection<?>) iterable;
745       try {
746         return collection.contains(item);
747       } catch (NullPointerException e) {
748         return false;
749       } catch (ClassCastException e) {
750         return false;
751       }
752     }
753     for (T i : iterable) {
754       if (Objects.equal(i, item)) {
755         return true;
756       }
757     }
758     return false;
759   }
760 
761   private GrpcUtil() {}
762 }
763