• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Objects;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Splitter;
26 import com.google.common.base.Stopwatch;
27 import com.google.common.base.Supplier;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
30 import io.grpc.CallOptions;
31 import io.grpc.ClientStreamTracer;
32 import io.grpc.ClientStreamTracer.StreamInfo;
33 import io.grpc.InternalChannelz.SocketStats;
34 import io.grpc.InternalLogId;
35 import io.grpc.InternalMetadata;
36 import io.grpc.InternalMetadata.TrustedAsciiMarshaller;
37 import io.grpc.LoadBalancer.PickResult;
38 import io.grpc.LoadBalancer.Subchannel;
39 import io.grpc.Metadata;
40 import io.grpc.MethodDescriptor;
41 import io.grpc.ProxiedSocketAddress;
42 import io.grpc.ProxyDetector;
43 import io.grpc.Status;
44 import io.grpc.internal.ClientStreamListener.RpcProgress;
45 import io.grpc.internal.SharedResourceHolder.Resource;
46 import io.grpc.internal.StreamListener.MessageProducer;
47 import java.io.Closeable;
48 import java.io.IOException;
49 import java.io.InputStream;
50 import java.lang.reflect.InvocationTargetException;
51 import java.lang.reflect.Method;
52 import java.net.HttpURLConnection;
53 import java.net.InetSocketAddress;
54 import java.net.SocketAddress;
55 import java.net.URI;
56 import java.net.URISyntaxException;
57 import java.nio.charset.Charset;
58 import java.util.Collection;
59 import java.util.Collections;
60 import java.util.EnumSet;
61 import java.util.List;
62 import java.util.Locale;
63 import java.util.Set;
64 import java.util.concurrent.Executor;
65 import java.util.concurrent.ExecutorService;
66 import java.util.concurrent.Executors;
67 import java.util.concurrent.ScheduledExecutorService;
68 import java.util.concurrent.ThreadFactory;
69 import java.util.concurrent.TimeUnit;
70 import java.util.logging.Level;
71 import java.util.logging.Logger;
72 import javax.annotation.Nullable;
73 import javax.annotation.concurrent.Immutable;
74 
75 /**
76  * Common utilities for GRPC.
77  */
78 public final class GrpcUtil {
79 
80   private static final Logger log = Logger.getLogger(GrpcUtil.class.getName());
81 
82   private static final Set<Status.Code> INAPPROPRIATE_CONTROL_PLANE_STATUS
83       = Collections.unmodifiableSet(EnumSet.of(
84           Status.Code.OK,
85           Status.Code.INVALID_ARGUMENT,
86           Status.Code.NOT_FOUND,
87           Status.Code.ALREADY_EXISTS,
88           Status.Code.FAILED_PRECONDITION,
89           Status.Code.ABORTED,
90           Status.Code.OUT_OF_RANGE,
91           Status.Code.DATA_LOSS));
92 
93   public static final Charset US_ASCII = Charset.forName("US-ASCII");
94 
95   /**
96    * {@link io.grpc.Metadata.Key} for the timeout header.
97    */
98   public static final Metadata.Key<Long> TIMEOUT_KEY =
99           Metadata.Key.of(GrpcUtil.TIMEOUT, new TimeoutMarshaller());
100 
101   /**
102    * {@link io.grpc.Metadata.Key} for the message encoding header.
103    */
104   public static final Metadata.Key<String> MESSAGE_ENCODING_KEY =
105           Metadata.Key.of(GrpcUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER);
106 
107   /**
108    * {@link io.grpc.Metadata.Key} for the accepted message encodings header.
109    */
110   public static final Metadata.Key<byte[]> MESSAGE_ACCEPT_ENCODING_KEY =
111       InternalMetadata.keyOf(GrpcUtil.MESSAGE_ACCEPT_ENCODING, new AcceptEncodingMarshaller());
112 
113   /**
114    * {@link io.grpc.Metadata.Key} for the stream's content encoding header.
115    */
116   public static final Metadata.Key<String> CONTENT_ENCODING_KEY =
117       Metadata.Key.of(GrpcUtil.CONTENT_ENCODING, Metadata.ASCII_STRING_MARSHALLER);
118 
119   /**
120    * {@link io.grpc.Metadata.Key} for the stream's accepted content encoding header.
121    */
122   public static final Metadata.Key<byte[]> CONTENT_ACCEPT_ENCODING_KEY =
123       InternalMetadata.keyOf(GrpcUtil.CONTENT_ACCEPT_ENCODING, new AcceptEncodingMarshaller());
124 
125   static final Metadata.Key<String> CONTENT_LENGTH_KEY =
126       Metadata.Key.of("content-length", Metadata.ASCII_STRING_MARSHALLER);
127 
128   private static final class AcceptEncodingMarshaller implements TrustedAsciiMarshaller<byte[]> {
129     @Override
toAsciiString(byte[] value)130     public byte[] toAsciiString(byte[] value) {
131       return value;
132     }
133 
134     @Override
parseAsciiString(byte[] serialized)135     public byte[] parseAsciiString(byte[] serialized) {
136       return serialized;
137     }
138   }
139 
140   /**
141    * {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
142    */
143   public static final Metadata.Key<String> CONTENT_TYPE_KEY =
144           Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER);
145 
146   /**
147    * {@link io.grpc.Metadata.Key} for the Transfer encoding.
148    */
149   public static final Metadata.Key<String> TE_HEADER =
150       Metadata.Key.of("te", Metadata.ASCII_STRING_MARSHALLER);
151 
152   /**
153    * {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
154    */
155   public static final Metadata.Key<String> USER_AGENT_KEY =
156           Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER);
157 
158   /**
159    * The default port for plain-text connections.
160    */
161   public static final int DEFAULT_PORT_PLAINTEXT = 80;
162 
163   /**
164    * The default port for SSL connections.
165    */
166   public static final int DEFAULT_PORT_SSL = 443;
167 
168   /**
169    * Content-Type used for GRPC-over-HTTP/2.
170    */
171   public static final String CONTENT_TYPE_GRPC = "application/grpc";
172 
173   /**
174    * The HTTP method used for GRPC requests.
175    */
176   public static final String HTTP_METHOD = "POST";
177 
178   /**
179    * The TE (transport encoding) header for requests over HTTP/2.
180    */
181   public static final String TE_TRAILERS = "trailers";
182 
183   /**
184    * The Timeout header name.
185    */
186   public static final String TIMEOUT = "grpc-timeout";
187 
188   /**
189    * The message encoding (i.e. compression) that can be used in the stream.
190    */
191   public static final String MESSAGE_ENCODING = "grpc-encoding";
192 
193   /**
194    * The accepted message encodings (i.e. compression) that can be used in the stream.
195    */
196   public static final String MESSAGE_ACCEPT_ENCODING = "grpc-accept-encoding";
197 
198   /**
199    * The content-encoding used to compress the full gRPC stream.
200    */
201   public static final String CONTENT_ENCODING = "content-encoding";
202 
203   /**
204    * The accepted content-encodings that can be used to compress the full gRPC stream.
205    */
206   public static final String CONTENT_ACCEPT_ENCODING = "accept-encoding";
207 
208   /**
209    * The default maximum uncompressed size (in bytes) for inbound messages. Defaults to 4 MiB.
210    */
211   public static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024;
212 
213   /**
214    * The default maximum size (in bytes) for inbound header/trailer.
215    */
216   // Update documentation in public-facing Builders when changing this value.
217   public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192;
218 
219   public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults();
220 
221   private static final String IMPLEMENTATION_VERSION = "1.56.1-SNAPSHOT"; // CURRENT_GRPC_VERSION
222 
223   /**
224    * The default timeout in nanos for a keepalive ping request.
225    */
226   public static final long DEFAULT_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L);
227 
228   /**
229    * The magic keepalive time value that disables client keepalive.
230    */
231   public static final long KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;
232 
233   /**
234    * The default delay in nanos for server keepalive.
235    */
236   public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
237 
238   /**
239    * The default timeout in nanos for a server keepalive ping request.
240    */
241   public static final long DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(20L);
242 
243   /**
244    * The magic keepalive time value that disables keepalive.
245    */
246   public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;
247 
248   /**
249    * The default proxy detector.
250    */
251   public static final ProxyDetector DEFAULT_PROXY_DETECTOR = new ProxyDetectorImpl();
252 
253   /**
254    * A proxy detector that always claims no proxy is needed.
255    */
256   public static final ProxyDetector NOOP_PROXY_DETECTOR = new ProxyDetector() {
257     @Nullable
258     @Override
259     public ProxiedSocketAddress proxyFor(SocketAddress targetServerAddress) {
260       return null;
261     }
262   };
263 
264   /**
265    * The very default load-balancing policy.
266    */
267   public static final String DEFAULT_LB_POLICY = "pick_first";
268 
269   /**
270    * RPCs created on the Channel returned by {@link io.grpc.LoadBalancer.Subchannel#asChannel}
271    * will have this option with value {@code true}.  They will be treated differently from
272    * the ones created by application.
273    */
274   public static final CallOptions.Key<Boolean> CALL_OPTIONS_RPC_OWNED_BY_BALANCER =
275       CallOptions.Key.create("io.grpc.internal.CALL_OPTIONS_RPC_OWNED_BY_BALANCER");
276 
277   private static final ClientStreamTracer NOOP_TRACER = new ClientStreamTracer() {};
278 
279   /**
280    * Returns true if an RPC with the given properties should be counted when calculating the
281    * in-use state of a transport.
282    */
shouldBeCountedForInUse(CallOptions callOptions)283   public static boolean shouldBeCountedForInUse(CallOptions callOptions) {
284     return !Boolean.TRUE.equals(callOptions.getOption(CALL_OPTIONS_RPC_OWNED_BY_BALANCER));
285   }
286 
287   /**
288    * Maps HTTP error response status codes to transport codes, as defined in <a
289    * href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md">
290    * http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is
291    * {@code true}.
292    */
httpStatusToGrpcStatus(int httpStatusCode)293   public static Status httpStatusToGrpcStatus(int httpStatusCode) {
294     return httpStatusToGrpcCode(httpStatusCode).toStatus()
295         .withDescription("HTTP status code " + httpStatusCode);
296   }
297 
httpStatusToGrpcCode(int httpStatusCode)298   private static Status.Code httpStatusToGrpcCode(int httpStatusCode) {
299     if (httpStatusCode >= 100 && httpStatusCode < 200) {
300       // 1xx. These headers should have been ignored.
301       return Status.Code.INTERNAL;
302     }
303     switch (httpStatusCode) {
304       case HttpURLConnection.HTTP_BAD_REQUEST:  // 400
305       case 431: // Request Header Fields Too Large
306         // TODO(carl-mastrangelo): this should be added to the http-grpc-status-mapping.md doc.
307         return Status.Code.INTERNAL;
308       case HttpURLConnection.HTTP_UNAUTHORIZED:  // 401
309         return Status.Code.UNAUTHENTICATED;
310       case HttpURLConnection.HTTP_FORBIDDEN:  // 403
311         return Status.Code.PERMISSION_DENIED;
312       case HttpURLConnection.HTTP_NOT_FOUND:  // 404
313         return Status.Code.UNIMPLEMENTED;
314       case 429:  // Too Many Requests
315       case HttpURLConnection.HTTP_BAD_GATEWAY:  // 502
316       case HttpURLConnection.HTTP_UNAVAILABLE:  // 503
317       case HttpURLConnection.HTTP_GATEWAY_TIMEOUT:  // 504
318         return Status.Code.UNAVAILABLE;
319       default:
320         return Status.Code.UNKNOWN;
321     }
322   }
323 
324   /**
325    * All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames.
326    */
327   public enum Http2Error {
328     /**
329      * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with
330      * {@code NO_ERROR}. In this case it is important to indicate to the application that the
331      * request should be retried (i.e. {@link Status#UNAVAILABLE}).
332      */
333     NO_ERROR(0x0, Status.UNAVAILABLE),
334     PROTOCOL_ERROR(0x1, Status.INTERNAL),
335     INTERNAL_ERROR(0x2, Status.INTERNAL),
336     FLOW_CONTROL_ERROR(0x3, Status.INTERNAL),
337     SETTINGS_TIMEOUT(0x4, Status.INTERNAL),
338     STREAM_CLOSED(0x5, Status.INTERNAL),
339     FRAME_SIZE_ERROR(0x6, Status.INTERNAL),
340     REFUSED_STREAM(0x7, Status.UNAVAILABLE),
341     CANCEL(0x8, Status.CANCELLED),
342     COMPRESSION_ERROR(0x9, Status.INTERNAL),
343     CONNECT_ERROR(0xA, Status.INTERNAL),
344     ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")),
345     INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as "
346         + "protocol is not secure enough to call")),
347     HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN);
348 
349     // Populate a mapping of code to enum value for quick look-up.
350     private static final Http2Error[] codeMap = buildHttp2CodeMap();
351 
buildHttp2CodeMap()352     private static Http2Error[] buildHttp2CodeMap() {
353       Http2Error[] errors = Http2Error.values();
354       int size = (int) errors[errors.length - 1].code() + 1;
355       Http2Error[] http2CodeMap = new Http2Error[size];
356       for (Http2Error error : errors) {
357         int index = (int) error.code();
358         http2CodeMap[index] = error;
359       }
360       return http2CodeMap;
361     }
362 
363     private final int code;
364     // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true
365     // when there are exceptions in the Status, which is not true here.
366     @SuppressWarnings("ImmutableEnumChecker")
367     private final Status status;
368 
Http2Error(int code, Status status)369     Http2Error(int code, Status status) {
370       this.code = code;
371       String description = "HTTP/2 error code: " + this.name();
372       if (status.getDescription() != null) {
373         description += " (" + status.getDescription() + ")";
374       }
375       this.status = status.withDescription(description);
376     }
377 
378     /**
379      * Gets the code for this error used on the wire.
380      */
code()381     public long code() {
382       return code;
383     }
384 
385     /**
386      * Gets the {@link Status} associated with this HTTP/2 code.
387      */
status()388     public Status status() {
389       return status;
390     }
391 
392     /**
393      * Looks up the HTTP/2 error code enum value for the specified code.
394      *
395      * @param code an HTTP/2 error code value.
396      * @return the HTTP/2 error code enum or {@code null} if not found.
397      */
forCode(long code)398     public static Http2Error forCode(long code) {
399       if (code >= codeMap.length || code < 0) {
400         return null;
401       }
402       return codeMap[(int) code];
403     }
404 
405     /**
406      * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code
407      * forCode(code).status()}, to more easily conform to HTTP/2:
408      *
409      * <blockquote>Unknown or unsupported error codes MUST NOT trigger any special behavior.
410      * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.</blockquote>
411      *
412      * @param code the HTTP/2 error code.
413      * @return a {@link Status} representing the given error.
414      */
statusForCode(long code)415     public static Status statusForCode(long code) {
416       Http2Error error = forCode(code);
417       if (error == null) {
418         // This "forgets" the message of INTERNAL_ERROR while keeping the same status code.
419         Status.Code statusCode = INTERNAL_ERROR.status().getCode();
420         return Status.fromCodeValue(statusCode.value())
421             .withDescription("Unrecognized HTTP/2 error code: " + code);
422       }
423 
424       return error.status();
425     }
426   }
427 
428   /**
429    * Indicates whether or not the given value is a valid gRPC content-type.
430    */
isGrpcContentType(String contentType)431   public static boolean isGrpcContentType(String contentType) {
432     if (contentType == null) {
433       return false;
434     }
435 
436     if (CONTENT_TYPE_GRPC.length() > contentType.length()) {
437       return false;
438     }
439 
440     contentType = contentType.toLowerCase(Locale.US);
441     if (!contentType.startsWith(CONTENT_TYPE_GRPC)) {
442       // Not a gRPC content-type.
443       return false;
444     }
445 
446     if (contentType.length() == CONTENT_TYPE_GRPC.length()) {
447       // The strings match exactly.
448       return true;
449     }
450 
451     // The contentType matches, but is longer than the expected string.
452     // We need to support variations on the content-type (e.g. +proto, +json) as defined by the
453     // gRPC wire spec.
454     char nextChar = contentType.charAt(CONTENT_TYPE_GRPC.length());
455     return nextChar == '+' || nextChar == ';';
456   }
457 
458   /**
459    * Gets the User-Agent string for the gRPC transport.
460    */
getGrpcUserAgent( String transportName, @Nullable String applicationUserAgent)461   public static String getGrpcUserAgent(
462       String transportName, @Nullable String applicationUserAgent) {
463     StringBuilder builder = new StringBuilder();
464     if (applicationUserAgent != null) {
465       builder.append(applicationUserAgent);
466       builder.append(' ');
467     }
468     builder.append("grpc-java-");
469     builder.append(transportName);
470     builder.append('/');
471     builder.append(IMPLEMENTATION_VERSION);
472     return builder.toString();
473   }
474 
475   @Immutable
476   public static final class GrpcBuildVersion {
477     private final String userAgent;
478     private final String implementationVersion;
479 
GrpcBuildVersion(String userAgent, String implementationVersion)480     private GrpcBuildVersion(String userAgent, String implementationVersion) {
481       this.userAgent = Preconditions.checkNotNull(userAgent, "userAgentName");
482       this.implementationVersion =
483           Preconditions.checkNotNull(implementationVersion, "implementationVersion");
484     }
485 
getUserAgent()486     public String getUserAgent() {
487       return userAgent;
488     }
489 
getImplementationVersion()490     public String getImplementationVersion() {
491       return implementationVersion;
492     }
493 
494     @Override
toString()495     public String toString() {
496       return userAgent + " " + implementationVersion;
497     }
498   }
499 
500   /**
501    * Returns the build version of gRPC.
502    */
getGrpcBuildVersion()503   public static GrpcBuildVersion getGrpcBuildVersion() {
504     return new GrpcBuildVersion("gRPC Java", IMPLEMENTATION_VERSION);
505   }
506 
507   /**
508    * Parse an authority into a URI for retrieving the host and port.
509    */
authorityToUri(String authority)510   public static URI authorityToUri(String authority) {
511     Preconditions.checkNotNull(authority, "authority");
512     URI uri;
513     try {
514       uri = new URI(null, authority, null, null, null);
515     } catch (URISyntaxException ex) {
516       throw new IllegalArgumentException("Invalid authority: " + authority, ex);
517     }
518     return uri;
519   }
520 
521   /**
522    * Verify {@code authority} is valid for use with gRPC. The syntax must be valid and it must not
523    * include userinfo.
524    *
525    * @return the {@code authority} provided
526    */
checkAuthority(String authority)527   public static String checkAuthority(String authority) {
528     URI uri = authorityToUri(authority);
529     checkArgument(uri.getHost() != null, "No host in authority '%s'", authority);
530     checkArgument(uri.getUserInfo() == null,
531         "Userinfo must not be present on authority: '%s'", authority);
532     return authority;
533   }
534 
535   /**
536    * Combine a host and port into an authority string.
537    */
538   // There is a copy of this method in io.grpc.Grpc
authorityFromHostAndPort(String host, int port)539   public static String authorityFromHostAndPort(String host, int port) {
540     try {
541       return new URI(null, null, host, port, null, null, null).getAuthority();
542     } catch (URISyntaxException ex) {
543       throw new IllegalArgumentException("Invalid host or port: " + host + " " + port, ex);
544     }
545   }
546 
547   /**
548    * Shared executor for channels.
549    */
550   public static final Resource<Executor> SHARED_CHANNEL_EXECUTOR =
551       new Resource<Executor>() {
552         private static final String NAME = "grpc-default-executor";
553         @Override
554         public Executor create() {
555           return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
556         }
557 
558         @Override
559         public void close(Executor instance) {
560           ((ExecutorService) instance).shutdown();
561         }
562 
563         @Override
564         public String toString() {
565           return NAME;
566         }
567       };
568 
569   /**
570    * Shared single-threaded executor for managing channel timers.
571    */
572   public static final Resource<ScheduledExecutorService> TIMER_SERVICE =
573       new Resource<ScheduledExecutorService>() {
574         @Override
575         public ScheduledExecutorService create() {
576           // We don't use newSingleThreadScheduledExecutor because it doesn't return a
577           // ScheduledThreadPoolExecutor.
578           ScheduledExecutorService service = Executors.newScheduledThreadPool(
579               1,
580               getThreadFactory("grpc-timer-%d", true));
581 
582           // If there are long timeouts that are cancelled, they will not actually be removed from
583           // the executors queue.  This forces immediate removal upon cancellation to avoid a
584           // memory leak.  Reflection is used because we cannot use methods added in Java 1.7.  If
585           // the method does not exist, we give up.  Note that the method is not present in 1.6, but
586           // _is_ present in the android standard library.
587           try {
588             Method method = service.getClass().getMethod("setRemoveOnCancelPolicy", boolean.class);
589             method.invoke(service, true);
590           } catch (NoSuchMethodException e) {
591             // no op
592           } catch (RuntimeException e) {
593             throw e;
594           } catch (Exception e) {
595             throw new RuntimeException(e);
596           }
597 
598           return Executors.unconfigurableScheduledExecutorService(service);
599         }
600 
601         @Override
602         public void close(ScheduledExecutorService instance) {
603           instance.shutdown();
604         }
605       };
606 
607 
608   /**
609    * Get a {@link ThreadFactory} suitable for use in the current environment.
610    * @param nameFormat to apply to threads created by the factory.
611    * @param daemon {@code true} if the threads the factory creates are daemon threads, {@code false}
612    *     otherwise.
613    * @return a {@link ThreadFactory}.
614    */
getThreadFactory(String nameFormat, boolean daemon)615   public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) {
616     return new ThreadFactoryBuilder()
617         .setDaemon(daemon)
618         .setNameFormat(nameFormat)
619         .build();
620   }
621 
622   /**
623    * The factory of default Stopwatches.
624    */
625   public static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() {
626       @Override
627       public Stopwatch get() {
628         return Stopwatch.createUnstarted();
629       }
630     };
631 
632   /**
633    * Returns the host via {@link InetSocketAddress#getHostString} if it is possible,
634    * i.e. in jdk >= 7.
635    * Otherwise, return it via {@link InetSocketAddress#getHostName} which may incur a DNS lookup.
636    */
getHost(InetSocketAddress addr)637   public static String getHost(InetSocketAddress addr) {
638     try {
639       Method getHostStringMethod = InetSocketAddress.class.getMethod("getHostString");
640       return (String) getHostStringMethod.invoke(addr);
641     } catch (NoSuchMethodException e) {
642       // noop
643     } catch (IllegalAccessException e) {
644       // noop
645     } catch (InvocationTargetException e) {
646       // noop
647     }
648     return addr.getHostName();
649   }
650 
651   /**
652    * Marshals a nanoseconds representation of the timeout to and from a string representation,
653    * consisting of an ASCII decimal representation of a number with at most 8 digits, followed by a
654    * unit. Available units:
655    * n = nanoseconds
656    * u = microseconds
657    * m = milliseconds
658    * S = seconds
659    * M = minutes
660    * H = hours
661    *
662    * <p>The representation is greedy with respect to precision. That is, 2 seconds will be
663    * represented as `2000000u`.</p>
664    *
665    * <p>See <a href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests">the
666    * request header definition</a></p>
667    */
668   @VisibleForTesting
669   static class TimeoutMarshaller implements Metadata.AsciiMarshaller<Long> {
670 
671     @Override
toAsciiString(Long timeoutNanos)672     public String toAsciiString(Long timeoutNanos) {
673       long cutoff = 100000000;
674       TimeUnit unit = TimeUnit.NANOSECONDS;
675       if (timeoutNanos < 0) {
676         throw new IllegalArgumentException("Timeout too small");
677       } else if (timeoutNanos < cutoff) {
678         return timeoutNanos + "n";
679       } else if (timeoutNanos < cutoff * 1000L) {
680         return unit.toMicros(timeoutNanos) + "u";
681       } else if (timeoutNanos < cutoff * 1000L * 1000L) {
682         return unit.toMillis(timeoutNanos) + "m";
683       } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L) {
684         return unit.toSeconds(timeoutNanos) + "S";
685       } else if (timeoutNanos < cutoff * 1000L * 1000L * 1000L * 60L) {
686         return unit.toMinutes(timeoutNanos) + "M";
687       } else {
688         return unit.toHours(timeoutNanos) + "H";
689       }
690     }
691 
692     @Override
parseAsciiString(String serialized)693     public Long parseAsciiString(String serialized) {
694       checkArgument(serialized.length() > 0, "empty timeout");
695       checkArgument(serialized.length() <= 9, "bad timeout format");
696       long value = Long.parseLong(serialized.substring(0, serialized.length() - 1));
697       char unit = serialized.charAt(serialized.length() - 1);
698       switch (unit) {
699         case 'n':
700           return value;
701         case 'u':
702           return TimeUnit.MICROSECONDS.toNanos(value);
703         case 'm':
704           return TimeUnit.MILLISECONDS.toNanos(value);
705         case 'S':
706           return TimeUnit.SECONDS.toNanos(value);
707         case 'M':
708           return TimeUnit.MINUTES.toNanos(value);
709         case 'H':
710           return TimeUnit.HOURS.toNanos(value);
711         default:
712           throw new IllegalArgumentException(String.format("Invalid timeout unit: %s", unit));
713       }
714     }
715   }
716 
717   /**
718    * Returns a transport out of a PickResult, or {@code null} if the result is "buffer".
719    */
720   @Nullable
getTransportFromPickResult(PickResult result, boolean isWaitForReady)721   static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) {
722     final ClientTransport transport;
723     Subchannel subchannel = result.getSubchannel();
724     if (subchannel != null) {
725       transport = ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport();
726     } else {
727       transport = null;
728     }
729     if (transport != null) {
730       final ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
731       if (streamTracerFactory == null) {
732         return transport;
733       }
734       return new ClientTransport() {
735         @Override
736         public ClientStream newStream(
737             MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
738             ClientStreamTracer[] tracers) {
739           StreamInfo info = StreamInfo.newBuilder().setCallOptions(callOptions).build();
740           ClientStreamTracer streamTracer =
741               streamTracerFactory.newClientStreamTracer(info, headers);
742           checkState(tracers[tracers.length - 1] == NOOP_TRACER, "lb tracer already assigned");
743           tracers[tracers.length - 1] = streamTracer;
744           return transport.newStream(method, headers, callOptions, tracers);
745         }
746 
747         @Override
748         public void ping(PingCallback callback, Executor executor) {
749           transport.ping(callback, executor);
750         }
751 
752         @Override
753         public InternalLogId getLogId() {
754           return transport.getLogId();
755         }
756 
757         @Override
758         public ListenableFuture<SocketStats> getStats() {
759           return transport.getStats();
760         }
761       };
762     }
763     if (!result.getStatus().isOk()) {
764       if (result.isDrop()) {
765         return new FailingClientTransport(
766             replaceInappropriateControlPlaneStatus(result.getStatus()), RpcProgress.DROPPED);
767       }
768       if (!isWaitForReady) {
769         return new FailingClientTransport(
770             replaceInappropriateControlPlaneStatus(result.getStatus()), RpcProgress.PROCESSED);
771       }
772     }
773     return null;
774   }
775 
776   /** Gets stream tracers based on CallOptions. */
777   public static ClientStreamTracer[] getClientStreamTracers(
778       CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) {
779     List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
780     ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1];
781     StreamInfo streamInfo = StreamInfo.newBuilder()
782         .setCallOptions(callOptions)
783         .setPreviousAttempts(previousAttempts)
784         .setIsTransparentRetry(isTransparentRetry)
785         .build();
786     for (int i = 0; i < factories.size(); i++) {
787       tracers[i] = factories.get(i).newClientStreamTracer(streamInfo, headers);
788     }
789     // Reserved to be set later by the lb as per the API contract of ClientTransport.newStream().
790     // See also GrpcUtil.getTransportFromPickResult()
791     tracers[tracers.length - 1] = NOOP_TRACER;
792     return tracers;
793   }
794 
795   /** Quietly closes all messages in MessageProducer. */
796   static void closeQuietly(MessageProducer producer) {
797     InputStream message;
798     while ((message = producer.next()) != null) {
799       closeQuietly(message);
800     }
801   }
802 
803   /**
804    * Closes a Closeable, ignoring IOExceptions.
805    * This method exists because Guava's {@code Closeables.closeQuietly()} is beta.
806    */
807   public static void closeQuietly(@Nullable Closeable message) {
808     if (message == null) {
809       return;
810     }
811     try {
812       message.close();
813     } catch (IOException ioException) {
814       // do nothing except log
815       log.log(Level.WARNING, "exception caught in closeQuietly", ioException);
816     }
817   }
818 
819   /** Reads {@code in} until end of stream. */
820   public static void exhaust(InputStream in) throws IOException {
821     byte[] buf = new byte[256];
822     while (in.read(buf) != -1) {}
823   }
824 
825   /**
826    * Some status codes from the control plane are not appropritate to use in the data plane. If one
827    * is given it will be replaced with INTERNAL, indicating a bug in the control plane
828    * implementation.
829    */
830   public static Status replaceInappropriateControlPlaneStatus(Status status) {
831     checkArgument(status != null);
832     return INAPPROPRIATE_CONTROL_PLANE_STATUS.contains(status.getCode())
833         ? Status.INTERNAL.withDescription(
834         "Inappropriate status code from control plane: " + status.getCode() + " "
835             + status.getDescription()).withCause(status.getCause()) : status;
836   }
837 
838   /**
839    * Checks whether the given item exists in the iterable.  This is copied from Guava Collect's
840    * {@code Iterables.contains()} because Guava Collect is not Android-friendly thus core can't
841    * depend on it.
842    */
843   static <T> boolean iterableContains(Iterable<T> iterable, T item) {
844     if (iterable instanceof Collection) {
845       Collection<?> collection = (Collection<?>) iterable;
846       try {
847         return collection.contains(item);
848       } catch (NullPointerException e) {
849         return false;
850       } catch (ClassCastException e) {
851         return false;
852       }
853     }
854     for (T i : iterable) {
855       if (Objects.equal(i, item)) {
856         return true;
857       }
858     }
859     return false;
860   }
861 
862   private GrpcUtil() {}
863 }
864