• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21 import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.MoreObjects;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Stopwatch;
27 import com.google.common.base.Supplier;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.SettableFuture;
30 import io.grpc.Attributes;
31 import io.grpc.CallOptions;
32 import io.grpc.ClientStreamTracer;
33 import io.grpc.Grpc;
34 import io.grpc.HttpConnectProxiedSocketAddress;
35 import io.grpc.InternalChannelz;
36 import io.grpc.InternalChannelz.SocketStats;
37 import io.grpc.InternalLogId;
38 import io.grpc.Metadata;
39 import io.grpc.MethodDescriptor;
40 import io.grpc.MethodDescriptor.MethodType;
41 import io.grpc.SecurityLevel;
42 import io.grpc.Status;
43 import io.grpc.Status.Code;
44 import io.grpc.StatusException;
45 import io.grpc.internal.ClientStreamListener.RpcProgress;
46 import io.grpc.internal.ConnectionClientTransport;
47 import io.grpc.internal.GrpcAttributes;
48 import io.grpc.internal.GrpcUtil;
49 import io.grpc.internal.Http2Ping;
50 import io.grpc.internal.InUseStateAggregator;
51 import io.grpc.internal.KeepAliveManager;
52 import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
53 import io.grpc.internal.SerializingExecutor;
54 import io.grpc.internal.StatsTraceContext;
55 import io.grpc.internal.TransportTracer;
56 import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
57 import io.grpc.okhttp.internal.ConnectionSpec;
58 import io.grpc.okhttp.internal.Credentials;
59 import io.grpc.okhttp.internal.StatusLine;
60 import io.grpc.okhttp.internal.framed.ErrorCode;
61 import io.grpc.okhttp.internal.framed.FrameReader;
62 import io.grpc.okhttp.internal.framed.FrameWriter;
63 import io.grpc.okhttp.internal.framed.Header;
64 import io.grpc.okhttp.internal.framed.HeadersMode;
65 import io.grpc.okhttp.internal.framed.Http2;
66 import io.grpc.okhttp.internal.framed.Settings;
67 import io.grpc.okhttp.internal.framed.Variant;
68 import io.grpc.okhttp.internal.proxy.HttpUrl;
69 import io.grpc.okhttp.internal.proxy.Request;
70 import io.perfmark.PerfMark;
71 import java.io.EOFException;
72 import java.io.IOException;
73 import java.net.InetSocketAddress;
74 import java.net.Socket;
75 import java.net.URI;
76 import java.util.Collections;
77 import java.util.Deque;
78 import java.util.EnumMap;
79 import java.util.HashMap;
80 import java.util.Iterator;
81 import java.util.LinkedList;
82 import java.util.List;
83 import java.util.Locale;
84 import java.util.Map;
85 import java.util.Random;
86 import java.util.concurrent.CountDownLatch;
87 import java.util.concurrent.Executor;
88 import java.util.concurrent.ScheduledExecutorService;
89 import java.util.logging.Level;
90 import java.util.logging.Logger;
91 import javax.annotation.Nullable;
92 import javax.annotation.concurrent.GuardedBy;
93 import javax.net.SocketFactory;
94 import javax.net.ssl.HostnameVerifier;
95 import javax.net.ssl.SSLSession;
96 import javax.net.ssl.SSLSocket;
97 import javax.net.ssl.SSLSocketFactory;
98 import okio.Buffer;
99 import okio.BufferedSink;
100 import okio.BufferedSource;
101 import okio.ByteString;
102 import okio.Okio;
103 import okio.Source;
104 import okio.Timeout;
105 
106 /**
107  * A okhttp-based {@link ConnectionClientTransport} implementation.
108  */
109 class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
110       OutboundFlowController.Transport {
111   private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
112   private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
113 
buildErrorCodeToStatusMap()114   private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
115     Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
116     errorToStatus.put(ErrorCode.NO_ERROR,
117         Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
118     errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
119         Status.INTERNAL.withDescription("Protocol error"));
120     errorToStatus.put(ErrorCode.INTERNAL_ERROR,
121         Status.INTERNAL.withDescription("Internal error"));
122     errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
123         Status.INTERNAL.withDescription("Flow control error"));
124     errorToStatus.put(ErrorCode.STREAM_CLOSED,
125         Status.INTERNAL.withDescription("Stream closed"));
126     errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
127         Status.INTERNAL.withDescription("Frame too large"));
128     errorToStatus.put(ErrorCode.REFUSED_STREAM,
129         Status.UNAVAILABLE.withDescription("Refused stream"));
130     errorToStatus.put(ErrorCode.CANCEL,
131         Status.CANCELLED.withDescription("Cancelled"));
132     errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
133         Status.INTERNAL.withDescription("Compression error"));
134     errorToStatus.put(ErrorCode.CONNECT_ERROR,
135         Status.INTERNAL.withDescription("Connect error"));
136     errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
137         Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
138     errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
139         Status.PERMISSION_DENIED.withDescription("Inadequate security"));
140     return Collections.unmodifiableMap(errorToStatus);
141   }
142 
143   private final InetSocketAddress address;
144   private final String defaultAuthority;
145   private final String userAgent;
146   private final Random random = new Random();
147   // Returns new unstarted stopwatches
148   private final Supplier<Stopwatch> stopwatchFactory;
149   private final int initialWindowSize;
150   private final Variant variant;
151   private Listener listener;
152   @GuardedBy("lock")
153   private ExceptionHandlingFrameWriter frameWriter;
154   private OutboundFlowController outboundFlow;
155   private final Object lock = new Object();
156   private final InternalLogId logId;
157   @GuardedBy("lock")
158   private int nextStreamId;
159   @GuardedBy("lock")
160   private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
161   private final Executor executor;
162   // Wrap on executor, to guarantee some operations be executed serially.
163   private final SerializingExecutor serializingExecutor;
164   private final ScheduledExecutorService scheduler;
165   private final int maxMessageSize;
166   private int connectionUnacknowledgedBytesRead;
167   private ClientFrameHandler clientFrameHandler;
168   // Caution: Not synchronized, new value can only be safely read after the connection is complete.
169   private Attributes attributes;
170   /**
171    * Indicates the transport is in go-away state: no new streams will be processed, but existing
172    * streams may continue.
173    */
174   @GuardedBy("lock")
175   private Status goAwayStatus;
176   @GuardedBy("lock")
177   private boolean goAwaySent;
178   @GuardedBy("lock")
179   private Http2Ping ping;
180   @GuardedBy("lock")
181   private boolean stopped;
182   @GuardedBy("lock")
183   private boolean hasStream;
184   private final SocketFactory socketFactory;
185   private SSLSocketFactory sslSocketFactory;
186   private HostnameVerifier hostnameVerifier;
187   private Socket socket;
188   @GuardedBy("lock")
189   private int maxConcurrentStreams = 0;
190   @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
191   @GuardedBy("lock")
192   private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
193   private final ConnectionSpec connectionSpec;
194   private KeepAliveManager keepAliveManager;
195   private boolean enableKeepAlive;
196   private long keepAliveTimeNanos;
197   private long keepAliveTimeoutNanos;
198   private boolean keepAliveWithoutCalls;
199   private final Runnable tooManyPingsRunnable;
200   private final int maxInboundMetadataSize;
201   private final boolean useGetForSafeMethods;
202   @GuardedBy("lock")
203   private final TransportTracer transportTracer;
204   @GuardedBy("lock")
205   private final InUseStateAggregator<OkHttpClientStream> inUseState =
206       new InUseStateAggregator<OkHttpClientStream>() {
207         @Override
208         protected void handleInUse() {
209           listener.transportInUse(true);
210         }
211 
212         @Override
213         protected void handleNotInUse() {
214           listener.transportInUse(false);
215         }
216       };
217   @GuardedBy("lock")
218   private InternalChannelz.Security securityInfo;
219 
220   @VisibleForTesting
221   @Nullable
222   final HttpConnectProxiedSocketAddress proxiedAddr;
223 
224   @VisibleForTesting
225   int proxySocketTimeout = 30000;
226 
227   // The following fields should only be used for test.
228   Runnable connectingCallback;
229   SettableFuture<Void> connectedFuture;
230 
OkHttpClientTransport( OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, InetSocketAddress address, String authority, @Nullable String userAgent, Attributes eagAttrs, @Nullable HttpConnectProxiedSocketAddress proxiedAddr, Runnable tooManyPingsRunnable)231   public OkHttpClientTransport(
232       OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
233       InetSocketAddress address,
234       String authority,
235       @Nullable String userAgent,
236       Attributes eagAttrs,
237       @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
238       Runnable tooManyPingsRunnable) {
239     this(
240         transportFactory,
241         address,
242         authority,
243         userAgent,
244         eagAttrs,
245         GrpcUtil.STOPWATCH_SUPPLIER,
246         new Http2(),
247         proxiedAddr,
248         tooManyPingsRunnable);
249   }
250 
OkHttpClientTransport( OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, InetSocketAddress address, String authority, @Nullable String userAgent, Attributes eagAttrs, Supplier<Stopwatch> stopwatchFactory, Variant variant, @Nullable HttpConnectProxiedSocketAddress proxiedAddr, Runnable tooManyPingsRunnable)251   private OkHttpClientTransport(
252       OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
253       InetSocketAddress address,
254       String authority,
255       @Nullable String userAgent,
256       Attributes eagAttrs,
257       Supplier<Stopwatch> stopwatchFactory,
258       Variant variant,
259       @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
260       Runnable tooManyPingsRunnable) {
261     this.address = Preconditions.checkNotNull(address, "address");
262     this.defaultAuthority = authority;
263     this.maxMessageSize = transportFactory.maxMessageSize;
264     this.initialWindowSize = transportFactory.flowControlWindow;
265     this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
266     serializingExecutor = new SerializingExecutor(transportFactory.executor);
267     this.scheduler = Preconditions.checkNotNull(
268         transportFactory.scheduledExecutorService, "scheduledExecutorService");
269     // Client initiated streams are odd, server initiated ones are even. Server should not need to
270     // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
271     nextStreamId = 3;
272     this.socketFactory = transportFactory.socketFactory == null
273         ? SocketFactory.getDefault() : transportFactory.socketFactory;
274     this.sslSocketFactory = transportFactory.sslSocketFactory;
275     this.hostnameVerifier = transportFactory.hostnameVerifier;
276     this.connectionSpec = Preconditions.checkNotNull(
277         transportFactory.connectionSpec, "connectionSpec");
278     this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
279     this.variant = Preconditions.checkNotNull(variant, "variant");
280     this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
281     this.proxiedAddr = proxiedAddr;
282     this.tooManyPingsRunnable =
283         Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
284     this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
285     this.transportTracer = transportFactory.transportTracerFactory.create();
286     this.logId = InternalLogId.allocate(getClass(), address.toString());
287     this.attributes = Attributes.newBuilder()
288         .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
289     this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
290     initTransportTracer();
291   }
292 
293   /**
294    * Create a transport connected to a fake peer for test.
295    */
296   @VisibleForTesting
OkHttpClientTransport( OkHttpChannelBuilder.OkHttpTransportFactory transportFactory, String userAgent, Supplier<Stopwatch> stopwatchFactory, Variant variant, @Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture, Runnable tooManyPingsRunnable)297   OkHttpClientTransport(
298       OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
299       String userAgent,
300       Supplier<Stopwatch> stopwatchFactory,
301       Variant variant,
302       @Nullable Runnable connectingCallback,
303       SettableFuture<Void> connectedFuture,
304       Runnable tooManyPingsRunnable) {
305     this(
306         transportFactory,
307         new InetSocketAddress("127.0.0.1", 80),
308         "notarealauthority:80",
309         userAgent,
310         Attributes.EMPTY,
311         stopwatchFactory,
312         variant,
313         null,
314         tooManyPingsRunnable);
315     this.connectingCallback = connectingCallback;
316     this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
317   }
318 
319   // sslSocketFactory is set to null when use plaintext.
isUsingPlaintext()320   boolean isUsingPlaintext() {
321     return sslSocketFactory == null;
322   }
323 
initTransportTracer()324   private void initTransportTracer() {
325     synchronized (lock) { // to make @GuardedBy linter happy
326       transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
327         @Override
328         public TransportTracer.FlowControlWindows read() {
329           synchronized (lock) {
330             long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
331             // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
332             // provide a lower bound.
333             long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
334             return new TransportTracer.FlowControlWindows(local, remote);
335           }
336         }
337       });
338     }
339   }
340 
341   /**
342    * Enable keepalive with custom delay and timeout.
343    */
enableKeepAlive(boolean enable, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls)344   void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
345       long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
346     enableKeepAlive = enable;
347     this.keepAliveTimeNanos = keepAliveTimeNanos;
348     this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
349     this.keepAliveWithoutCalls = keepAliveWithoutCalls;
350   }
351 
352   @Override
ping(final PingCallback callback, Executor executor)353   public void ping(final PingCallback callback, Executor executor) {
354     long data = 0;
355     Http2Ping p;
356     boolean writePing;
357     synchronized (lock) {
358       checkState(frameWriter != null);
359       if (stopped) {
360         Http2Ping.notifyFailed(callback, executor, getPingFailure());
361         return;
362       }
363       if (ping != null) {
364         // we only allow one outstanding ping at a time, so just add the callback to
365         // any outstanding operation
366         p = ping;
367         writePing = false;
368       } else {
369         // set outstanding operation and then write the ping after releasing lock
370         data = random.nextLong();
371         Stopwatch stopwatch = stopwatchFactory.get();
372         stopwatch.start();
373         p = ping = new Http2Ping(data, stopwatch);
374         writePing = true;
375         transportTracer.reportKeepAliveSent();
376       }
377       if (writePing) {
378         frameWriter.ping(false, (int) (data >>> 32), (int) data);
379       }
380     }
381     // If transport concurrently failed/stopped since we released the lock above, this could
382     // immediately invoke callback (which we shouldn't do while holding a lock)
383     p.addCallback(callback, executor);
384   }
385 
386   @Override
newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions, ClientStreamTracer[] tracers)387   public OkHttpClientStream newStream(
388       MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
389       ClientStreamTracer[] tracers) {
390     Preconditions.checkNotNull(method, "method");
391     Preconditions.checkNotNull(headers, "headers");
392     StatsTraceContext statsTraceContext =
393         StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
394     // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
395     synchronized (lock) { // to make @GuardedBy linter happy
396       return new OkHttpClientStream(
397           method,
398           headers,
399           frameWriter,
400           OkHttpClientTransport.this,
401           outboundFlow,
402           lock,
403           maxMessageSize,
404           initialWindowSize,
405           defaultAuthority,
406           userAgent,
407           statsTraceContext,
408           transportTracer,
409           callOptions,
410           useGetForSafeMethods);
411     }
412   }
413 
414   @GuardedBy("lock")
streamReadyToStart(OkHttpClientStream clientStream)415   void streamReadyToStart(OkHttpClientStream clientStream) {
416     if (goAwayStatus != null) {
417       clientStream.transportState().transportReportStatus(
418           goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
419     } else if (streams.size() >= maxConcurrentStreams) {
420       pendingStreams.add(clientStream);
421       setInUse(clientStream);
422     } else {
423       startStream(clientStream);
424     }
425   }
426 
427   @SuppressWarnings("GuardedBy")
428   @GuardedBy("lock")
startStream(OkHttpClientStream stream)429   private void startStream(OkHttpClientStream stream) {
430     Preconditions.checkState(
431         stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
432     streams.put(nextStreamId, stream);
433     setInUse(stream);
434     // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
435     // found: 'this.lock'
436     stream.transportState().start(nextStreamId);
437     // For unary and server streaming, there will be a data frame soon, no need to flush the header.
438     if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
439         || stream.useGet()) {
440       frameWriter.flush();
441     }
442     if (nextStreamId >= Integer.MAX_VALUE - 2) {
443       // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
444       // correctly.
445       nextStreamId = Integer.MAX_VALUE;
446       startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
447           Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
448     } else {
449       nextStreamId += 2;
450     }
451   }
452 
453   /**
454    * Starts pending streams, returns true if at least one pending stream is started.
455    */
456   @GuardedBy("lock")
startPendingStreams()457   private boolean startPendingStreams() {
458     boolean hasStreamStarted = false;
459     while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
460       OkHttpClientStream stream = pendingStreams.poll();
461       startStream(stream);
462       hasStreamStarted = true;
463     }
464     return hasStreamStarted;
465   }
466 
467   /**
468    * Removes given pending stream, used when a pending stream is cancelled.
469    */
470   @GuardedBy("lock")
removePendingStream(OkHttpClientStream pendingStream)471   void removePendingStream(OkHttpClientStream pendingStream) {
472     pendingStreams.remove(pendingStream);
473     maybeClearInUse(pendingStream);
474   }
475 
476   @Override
start(Listener listener)477   public Runnable start(Listener listener) {
478     this.listener = Preconditions.checkNotNull(listener, "listener");
479 
480     if (enableKeepAlive) {
481       keepAliveManager = new KeepAliveManager(
482           new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
483           keepAliveWithoutCalls);
484       keepAliveManager.onTransportStarted();
485     }
486 
487     int maxQueuedControlFrames = 10000;
488     final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
489     FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
490         variant.newWriter(Okio.buffer(asyncSink), true));
491 
492     synchronized (lock) {
493       // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
494       // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
495       // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
496       // the same TransportExceptionHandler instance so it is all mixed back together.
497       frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
498       outboundFlow = new OutboundFlowController(this, frameWriter);
499     }
500     final CountDownLatch latch = new CountDownLatch(1);
501     // Connecting in the serializingExecutor, so that some stream operations like synStream
502     // will be executed after connected.
503     serializingExecutor.execute(new Runnable() {
504       @Override
505       public void run() {
506         // This is a hack to make sure the connection preface and initial settings to be sent out
507         // without blocking the start. By doing this essentially prevents potential deadlock when
508         // network is not available during startup while another thread holding lock to send the
509         // initial preface.
510         try {
511           latch.await();
512         } catch (InterruptedException e) {
513           Thread.currentThread().interrupt();
514         }
515         // Use closed source on failure so that the reader immediately shuts down.
516         BufferedSource source = Okio.buffer(new Source() {
517           @Override
518           public long read(Buffer sink, long byteCount) {
519             return -1;
520           }
521 
522           @Override
523           public Timeout timeout() {
524             return Timeout.NONE;
525           }
526 
527           @Override
528           public void close() {
529           }
530         });
531         Socket sock;
532         SSLSession sslSession = null;
533         try {
534           if (proxiedAddr == null) {
535             sock = socketFactory.createSocket(address.getAddress(), address.getPort());
536           } else {
537             if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
538               sock = createHttpProxySocket(
539                   proxiedAddr.getTargetAddress(),
540                   (InetSocketAddress) proxiedAddr.getProxyAddress(),
541                   proxiedAddr.getUsername(),
542                   proxiedAddr.getPassword()
543               );
544             } else {
545               throw Status.INTERNAL.withDescription(
546                   "Unsupported SocketAddress implementation "
547                   + proxiedAddr.getProxyAddress().getClass()).asException();
548             }
549           }
550           if (sslSocketFactory != null) {
551             SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
552                 sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
553                 connectionSpec);
554             sslSession = sslSocket.getSession();
555             sock = sslSocket;
556           }
557           sock.setTcpNoDelay(true);
558           source = Okio.buffer(Okio.source(sock));
559           asyncSink.becomeConnected(Okio.sink(sock), sock);
560 
561           // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
562           attributes = attributes.toBuilder()
563               .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
564               .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
565               .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
566               .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
567                   sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
568               .build();
569         } catch (StatusException e) {
570           startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
571           return;
572         } catch (Exception e) {
573           onException(e);
574           return;
575         } finally {
576           clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
577         }
578         synchronized (lock) {
579           socket = Preconditions.checkNotNull(sock, "socket");
580           if (sslSession != null) {
581             securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
582           }
583         }
584       }
585     });
586     // Schedule to send connection preface & settings before any other write.
587     try {
588       sendConnectionPrefaceAndSettings();
589     } finally {
590       latch.countDown();
591     }
592 
593     serializingExecutor.execute(new Runnable() {
594       @Override
595       public void run() {
596         if (connectingCallback != null) {
597           connectingCallback.run();
598         }
599         // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
600         // may send goAway immediately.
601         executor.execute(clientFrameHandler);
602         synchronized (lock) {
603           maxConcurrentStreams = Integer.MAX_VALUE;
604           startPendingStreams();
605         }
606         if (connectedFuture != null) {
607           connectedFuture.set(null);
608         }
609       }
610     });
611     return null;
612   }
613 
614   /**
615    * Should only be called once when the transport is first established.
616    */
sendConnectionPrefaceAndSettings()617   private void sendConnectionPrefaceAndSettings() {
618     synchronized (lock) {
619       frameWriter.connectionPreface();
620       Settings settings = new Settings();
621       OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
622       frameWriter.settings(settings);
623       if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
624         frameWriter.windowUpdate(
625                 Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
626       }
627     }
628   }
629 
createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyUsername, String proxyPassword)630   private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
631       String proxyUsername, String proxyPassword) throws StatusException {
632     Socket sock = null;
633     try {
634       // The proxy address may not be resolved
635       if (proxyAddress.getAddress() != null) {
636         sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
637       } else {
638         sock =
639             socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
640       }
641       sock.setTcpNoDelay(true);
642       // A socket timeout is needed because lost network connectivity while reading from the proxy,
643       // can cause reading from the socket to hang.
644       sock.setSoTimeout(proxySocketTimeout);
645 
646       Source source = Okio.source(sock);
647       BufferedSink sink = Okio.buffer(Okio.sink(sock));
648 
649       // Prepare headers and request method line
650       Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
651       HttpUrl url = proxyRequest.httpUrl();
652       String requestLine =
653           String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
654 
655       // Write request to socket
656       sink.writeUtf8(requestLine).writeUtf8("\r\n");
657       for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
658         sink.writeUtf8(proxyRequest.headers().name(i))
659             .writeUtf8(": ")
660             .writeUtf8(proxyRequest.headers().value(i))
661             .writeUtf8("\r\n");
662       }
663       sink.writeUtf8("\r\n");
664       // Flush buffer (flushes socket and sends request)
665       sink.flush();
666 
667       // Read status line, check if 2xx was returned
668       StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
669       // Drain rest of headers
670       while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
671       if (statusLine.code < 200 || statusLine.code >= 300) {
672         Buffer body = new Buffer();
673         try {
674           sock.shutdownOutput();
675           source.read(body, 1024);
676         } catch (IOException ex) {
677           body.writeUtf8("Unable to read body: " + ex.toString());
678         }
679         try {
680           sock.close();
681         } catch (IOException ignored) {
682           // ignored
683         }
684         String message = String.format(
685             Locale.US,
686             "Response returned from proxy was not successful (expected 2xx, got %d %s). "
687               + "Response body:\n%s",
688             statusLine.code, statusLine.message, body.readUtf8());
689         throw Status.UNAVAILABLE.withDescription(message).asException();
690       }
691       // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
692       sock.setSoTimeout(0);
693       return sock;
694     } catch (IOException e) {
695       if (sock != null) {
696         GrpcUtil.closeQuietly(sock);
697       }
698       throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
699           .asException();
700     }
701   }
702 
createHttpProxyRequest(InetSocketAddress address, String proxyUsername, String proxyPassword)703   private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
704                                          String proxyPassword) {
705     HttpUrl tunnelUrl = new HttpUrl.Builder()
706         .scheme("https")
707         .host(address.getHostName())
708         .port(address.getPort())
709         .build();
710 
711     Request.Builder request = new Request.Builder()
712         .url(tunnelUrl)
713         .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
714         .header("User-Agent", userAgent);
715 
716     // If we have proxy credentials, set them right away
717     if (proxyUsername != null && proxyPassword != null) {
718       request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
719     }
720     return request.build();
721   }
722 
readUtf8LineStrictUnbuffered(Source source)723   private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
724     Buffer buffer = new Buffer();
725     while (true) {
726       if (source.read(buffer, 1) == -1) {
727         throw new EOFException("\\n not found: " + buffer.readByteString().hex());
728       }
729       if (buffer.getByte(buffer.size() - 1) == '\n') {
730         return buffer.readUtf8LineStrict();
731       }
732     }
733   }
734 
735   @Override
toString()736   public String toString() {
737     return MoreObjects.toStringHelper(this)
738         .add("logId", logId.getId())
739         .add("address", address)
740         .toString();
741   }
742 
743   @Override
getLogId()744   public InternalLogId getLogId() {
745     return logId;
746   }
747 
748   /**
749    * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
750    * authority, uri.getHost() will (rightly) return null, since the authority is no longer
751    * an actual service.  This method overrides the behavior for practical reasons.  For example,
752    * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
753    * we return the input.  This is because the return value, in conjunction with getOverridenPort,
754    * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
755    * connection to the port, independent of this function.
756    *
757    * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
758    * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
759    * authorities do not have ports, so this is even more wrong than before.  Sorry.
760    */
761   @VisibleForTesting
getOverridenHost()762   String getOverridenHost() {
763     URI uri = GrpcUtil.authorityToUri(defaultAuthority);
764     if (uri.getHost() != null) {
765       return uri.getHost();
766     }
767 
768     return defaultAuthority;
769   }
770 
771   @VisibleForTesting
getOverridenPort()772   int getOverridenPort() {
773     URI uri = GrpcUtil.authorityToUri(defaultAuthority);
774     if (uri.getPort() != -1) {
775       return uri.getPort();
776     }
777 
778     return address.getPort();
779   }
780 
781   @Override
shutdown(Status reason)782   public void shutdown(Status reason) {
783     synchronized (lock) {
784       if (goAwayStatus != null) {
785         return;
786       }
787 
788       goAwayStatus = reason;
789       listener.transportShutdown(goAwayStatus);
790       stopIfNecessary();
791     }
792   }
793 
794   @Override
shutdownNow(Status reason)795   public void shutdownNow(Status reason) {
796     shutdown(reason);
797     synchronized (lock) {
798       Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
799       while (it.hasNext()) {
800         Map.Entry<Integer, OkHttpClientStream> entry = it.next();
801         it.remove();
802         entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
803         maybeClearInUse(entry.getValue());
804       }
805 
806       for (OkHttpClientStream stream : pendingStreams) {
807         // in cases such as the connection fails to ACK keep-alive, pending streams should have a
808         // chance to retry and be routed to another connection.
809         stream.transportState().transportReportStatus(
810             reason, RpcProgress.MISCARRIED, true, new Metadata());
811         maybeClearInUse(stream);
812       }
813       pendingStreams.clear();
814 
815       stopIfNecessary();
816     }
817   }
818 
819   @Override
getAttributes()820   public Attributes getAttributes() {
821     return attributes;
822   }
823 
824   /**
825    * Gets all active streams as an array.
826    */
827   @Override
getActiveStreams()828   public OutboundFlowController.StreamState[] getActiveStreams() {
829     synchronized (lock) {
830       OutboundFlowController.StreamState[] flowStreams =
831           new OutboundFlowController.StreamState[streams.size()];
832       int i = 0;
833       for (OkHttpClientStream stream : streams.values()) {
834         flowStreams[i++] = stream.transportState().getOutboundFlowState();
835       }
836       return flowStreams;
837     }
838   }
839 
840   @VisibleForTesting
getHandler()841   ClientFrameHandler getHandler() {
842     return clientFrameHandler;
843   }
844 
845   @VisibleForTesting
getSocketFactory()846   SocketFactory getSocketFactory() {
847     return socketFactory;
848   }
849 
850   @VisibleForTesting
getPendingStreamSize()851   int getPendingStreamSize() {
852     synchronized (lock) {
853       return pendingStreams.size();
854     }
855   }
856 
857   @VisibleForTesting
setNextStreamId(int nextStreamId)858   void setNextStreamId(int nextStreamId) {
859     synchronized (lock) {
860       this.nextStreamId = nextStreamId;
861     }
862   }
863 
864   /**
865    * Finish all active streams due to an IOException, then close the transport.
866    */
867   @Override
onException(Throwable failureCause)868   public void onException(Throwable failureCause) {
869     Preconditions.checkNotNull(failureCause, "failureCause");
870     Status status = Status.UNAVAILABLE.withCause(failureCause);
871     startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
872   }
873 
874   /**
875    * Send GOAWAY to the server, then finish all active streams and close the transport.
876    */
onError(ErrorCode errorCode, String moreDetail)877   private void onError(ErrorCode errorCode, String moreDetail) {
878     startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
879   }
880 
startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status)881   private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
882     synchronized (lock) {
883       if (goAwayStatus == null) {
884         goAwayStatus = status;
885         listener.transportShutdown(status);
886       }
887       if (errorCode != null && !goAwaySent) {
888         // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
889         // streams. The GOAWAY is part of graceful shutdown.
890         goAwaySent = true;
891         frameWriter.goAway(0, errorCode, new byte[0]);
892       }
893 
894       Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
895       while (it.hasNext()) {
896         Map.Entry<Integer, OkHttpClientStream> entry = it.next();
897         if (entry.getKey() > lastKnownStreamId) {
898           it.remove();
899           entry.getValue().transportState().transportReportStatus(
900               status, RpcProgress.REFUSED, false, new Metadata());
901           maybeClearInUse(entry.getValue());
902         }
903       }
904 
905       for (OkHttpClientStream stream : pendingStreams) {
906         stream.transportState().transportReportStatus(
907             status, RpcProgress.MISCARRIED, true, new Metadata());
908         maybeClearInUse(stream);
909       }
910       pendingStreams.clear();
911 
912       stopIfNecessary();
913     }
914   }
915 
916   /**
917    * Called when a stream is closed. We do things like:
918    * <ul>
919    * <li>Removing the stream from the map.
920    * <li>Optionally reporting the status.
921    * <li>Starting pending streams if we can.
922    * <li>Stopping the transport if this is the last live stream under a go-away status.
923    * </ul>
924    *
925    * @param streamId the Id of the stream.
926    * @param status the final status of this stream, null means no need to report.
927    * @param stopDelivery interrupt queued messages in the deframer
928    * @param errorCode reset the stream with this ErrorCode if not null.
929    * @param trailers the trailers received if not null
930    */
finishStream( int streamId, @Nullable Status status, RpcProgress rpcProgress, boolean stopDelivery, @Nullable ErrorCode errorCode, @Nullable Metadata trailers)931   void finishStream(
932       int streamId,
933       @Nullable Status status,
934       RpcProgress rpcProgress,
935       boolean stopDelivery,
936       @Nullable ErrorCode errorCode,
937       @Nullable Metadata trailers) {
938     synchronized (lock) {
939       OkHttpClientStream stream = streams.remove(streamId);
940       if (stream != null) {
941         if (errorCode != null) {
942           frameWriter.rstStream(streamId, ErrorCode.CANCEL);
943         }
944         if (status != null) {
945           stream
946               .transportState()
947               .transportReportStatus(
948                   status,
949                   rpcProgress,
950                   stopDelivery,
951                   trailers != null ? trailers : new Metadata());
952         }
953         if (!startPendingStreams()) {
954           stopIfNecessary();
955           maybeClearInUse(stream);
956         }
957       }
958     }
959   }
960 
961   /**
962    * When the transport is in goAway state, we should stop it once all active streams finish.
963    */
964   @GuardedBy("lock")
stopIfNecessary()965   private void stopIfNecessary() {
966     if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
967       return;
968     }
969     if (stopped) {
970       return;
971     }
972     stopped = true;
973 
974     if (keepAliveManager != null) {
975       keepAliveManager.onTransportTermination();
976     }
977 
978     if (ping != null) {
979       ping.failed(getPingFailure());
980       ping = null;
981     }
982 
983     if (!goAwaySent) {
984       // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
985       // streams. The GOAWAY is part of graceful shutdown.
986       goAwaySent = true;
987       frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
988     }
989 
990     // We will close the underlying socket in the writing thread to break out the reader
991     // thread, which will close the frameReader and notify the listener.
992     frameWriter.close();
993   }
994 
995   @GuardedBy("lock")
maybeClearInUse(OkHttpClientStream stream)996   private void maybeClearInUse(OkHttpClientStream stream) {
997     if (hasStream) {
998       if (pendingStreams.isEmpty() && streams.isEmpty()) {
999         hasStream = false;
1000         if (keepAliveManager != null) {
1001           // We don't have any active streams. No need to do keepalives any more.
1002           // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1003           // and onTransportActive.
1004           keepAliveManager.onTransportIdle();
1005         }
1006       }
1007     }
1008     if (stream.shouldBeCountedForInUse()) {
1009       inUseState.updateObjectInUse(stream, false);
1010     }
1011   }
1012 
1013   @GuardedBy("lock")
setInUse(OkHttpClientStream stream)1014   private void setInUse(OkHttpClientStream stream) {
1015     if (!hasStream) {
1016       hasStream = true;
1017       if (keepAliveManager != null) {
1018         // We have a new stream. We might need to do keepalives now.
1019         // Note that we have to do this inside the lock to avoid calling
1020         // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1021         // order.
1022         keepAliveManager.onTransportActive();
1023       }
1024     }
1025     if (stream.shouldBeCountedForInUse()) {
1026       inUseState.updateObjectInUse(stream, true);
1027     }
1028   }
1029 
getPingFailure()1030   private Throwable getPingFailure() {
1031     synchronized (lock) {
1032       if (goAwayStatus != null) {
1033         return goAwayStatus.asException();
1034       } else {
1035         return Status.UNAVAILABLE.withDescription("Connection closed").asException();
1036       }
1037     }
1038   }
1039 
mayHaveCreatedStream(int streamId)1040   boolean mayHaveCreatedStream(int streamId) {
1041     synchronized (lock) {
1042       return streamId < nextStreamId && (streamId & 1) == 1;
1043     }
1044   }
1045 
getStream(int streamId)1046   OkHttpClientStream getStream(int streamId) {
1047     synchronized (lock) {
1048       return streams.get(streamId);
1049     }
1050   }
1051 
1052   /**
1053    * Returns a Grpc status corresponding to the given ErrorCode.
1054    */
1055   @VisibleForTesting
toGrpcStatus(ErrorCode code)1056   static Status toGrpcStatus(ErrorCode code) {
1057     Status status = ERROR_CODE_TO_STATUS.get(code);
1058     return status != null ? status : Status.UNKNOWN.withDescription(
1059         "Unknown http2 error code: " + code.httpCode);
1060   }
1061 
1062   @Override
getStats()1063   public ListenableFuture<SocketStats> getStats() {
1064     SettableFuture<SocketStats> ret = SettableFuture.create();
1065     synchronized (lock) {
1066       if (socket == null) {
1067         ret.set(new SocketStats(
1068             transportTracer.getStats(),
1069             /*local=*/ null,
1070             /*remote=*/ null,
1071             new InternalChannelz.SocketOptions.Builder().build(),
1072             /*security=*/ null));
1073       } else {
1074         ret.set(new SocketStats(
1075             transportTracer.getStats(),
1076             socket.getLocalSocketAddress(),
1077             socket.getRemoteSocketAddress(),
1078             Utils.getSocketOptions(socket),
1079             securityInfo));
1080       }
1081       return ret;
1082     }
1083   }
1084 
1085   /**
1086    * Runnable which reads frames and dispatches them to in flight calls.
1087    */
1088   class ClientFrameHandler implements FrameReader.Handler, Runnable {
1089 
1090     private final OkHttpFrameLogger logger =
1091         new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1092     FrameReader frameReader;
1093     boolean firstSettings = true;
1094 
ClientFrameHandler(FrameReader frameReader)1095     ClientFrameHandler(FrameReader frameReader) {
1096       this.frameReader = frameReader;
1097     }
1098 
1099     @Override
run()1100     public void run() {
1101       String threadName = Thread.currentThread().getName();
1102       Thread.currentThread().setName("OkHttpClientTransport");
1103       try {
1104         // Read until the underlying socket closes.
1105         while (frameReader.nextFrame(this)) {
1106           if (keepAliveManager != null) {
1107             keepAliveManager.onDataReceived();
1108           }
1109         }
1110         // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1111         // it may be triggered by the socket closing, in such case, the startGoAway() will do
1112         // nothing, otherwise, we finish all streams since it's a real IO issue.
1113         Status status;
1114         synchronized (lock) {
1115           status = goAwayStatus;
1116         }
1117         if (status == null) {
1118           status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1119         }
1120         startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1121       } catch (Throwable t) {
1122         // TODO(madongfly): Send the exception message to the server.
1123         startGoAway(
1124             0,
1125             ErrorCode.PROTOCOL_ERROR,
1126             Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1127       } finally {
1128         try {
1129           frameReader.close();
1130         } catch (IOException ex) {
1131           log.log(Level.INFO, "Exception closing frame reader", ex);
1132         }
1133         listener.transportTerminated();
1134         Thread.currentThread().setName(threadName);
1135       }
1136     }
1137 
1138     /**
1139      * Handle an HTTP2 DATA frame.
1140      */
1141     @SuppressWarnings("GuardedBy")
1142     @Override
data(boolean inFinished, int streamId, BufferedSource in, int length)1143     public void data(boolean inFinished, int streamId, BufferedSource in, int length)
1144         throws IOException {
1145       logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1146           streamId, in.getBuffer(), length, inFinished);
1147       OkHttpClientStream stream = getStream(streamId);
1148       if (stream == null) {
1149         if (mayHaveCreatedStream(streamId)) {
1150           synchronized (lock) {
1151             frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1152           }
1153           in.skip(length);
1154         } else {
1155           onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1156           return;
1157         }
1158       } else {
1159         // Wait until the frame is complete.
1160         in.require(length);
1161 
1162         Buffer buf = new Buffer();
1163         buf.write(in.getBuffer(), length);
1164         PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1165             stream.transportState().tag());
1166         synchronized (lock) {
1167           // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1168           // instead found: 'OkHttpClientTransport.this.lock'
1169           stream.transportState().transportDataReceived(buf, inFinished);
1170         }
1171       }
1172 
1173       // connection window update
1174       connectionUnacknowledgedBytesRead += length;
1175       if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1176         synchronized (lock) {
1177           frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1178         }
1179         connectionUnacknowledgedBytesRead = 0;
1180       }
1181     }
1182 
1183     /**
1184      * Handle HTTP2 HEADER and CONTINUATION frames.
1185      */
1186     @SuppressWarnings("GuardedBy")
1187     @Override
headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)1188     public void headers(boolean outFinished,
1189         boolean inFinished,
1190         int streamId,
1191         int associatedStreamId,
1192         List<Header> headerBlock,
1193         HeadersMode headersMode) {
1194       logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1195       boolean unknownStream = false;
1196       Status failedStatus = null;
1197       if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1198         int metadataSize = headerBlockSize(headerBlock);
1199         if (metadataSize > maxInboundMetadataSize) {
1200           failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1201               String.format(
1202                   Locale.US,
1203                   "Response %s metadata larger than %d: %d",
1204                   inFinished ? "trailer" : "header",
1205                   maxInboundMetadataSize,
1206                   metadataSize));
1207         }
1208       }
1209       synchronized (lock) {
1210         OkHttpClientStream stream = streams.get(streamId);
1211         if (stream == null) {
1212           if (mayHaveCreatedStream(streamId)) {
1213             frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1214           } else {
1215             unknownStream = true;
1216           }
1217         } else {
1218           if (failedStatus == null) {
1219             PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1220                 stream.transportState().tag());
1221             // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1222             // instead found: 'OkHttpClientTransport.this.lock'
1223             stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1224           } else {
1225             if (!inFinished) {
1226               frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1227             }
1228             stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1229           }
1230         }
1231       }
1232       if (unknownStream) {
1233         // We don't expect any server-initiated streams.
1234         onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1235       }
1236     }
1237 
headerBlockSize(List<Header> headerBlock)1238     private int headerBlockSize(List<Header> headerBlock) {
1239       // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
1240       long size = 0;
1241       for (int i = 0; i < headerBlock.size(); i++) {
1242         Header header = headerBlock.get(i);
1243         size += 32 + header.name.size() + header.value.size();
1244       }
1245       size = Math.min(size, Integer.MAX_VALUE);
1246       return (int) size;
1247     }
1248 
1249     @Override
rstStream(int streamId, ErrorCode errorCode)1250     public void rstStream(int streamId, ErrorCode errorCode) {
1251       logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1252       Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1253       boolean stopDelivery =
1254           (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1255       synchronized (lock) {
1256         OkHttpClientStream stream = streams.get(streamId);
1257         if (stream != null) {
1258           PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1259               stream.transportState().tag());
1260           finishStream(
1261               streamId, status,
1262               errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1263               stopDelivery, null, null);
1264         }
1265       }
1266     }
1267 
1268     @Override
settings(boolean clearPrevious, Settings settings)1269     public void settings(boolean clearPrevious, Settings settings) {
1270       logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1271       boolean outboundWindowSizeIncreased = false;
1272       synchronized (lock) {
1273         if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1274           int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1275               settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1276           maxConcurrentStreams = receivedMaxConcurrentStreams;
1277         }
1278 
1279         if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1280           int initialWindowSize = OkHttpSettingsUtil.get(
1281               settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1282           outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1283         }
1284         if (firstSettings) {
1285           listener.transportReady();
1286           firstSettings = false;
1287         }
1288 
1289         // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1290         // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1291         // otherwise it will cause a stream error (RST_STREAM).
1292         frameWriter.ackSettings(settings);
1293 
1294         // send any pending bytes / streams
1295         if (outboundWindowSizeIncreased) {
1296           outboundFlow.writeStreams();
1297         }
1298         startPendingStreams();
1299       }
1300     }
1301 
1302     @Override
ping(boolean ack, int payload1, int payload2)1303     public void ping(boolean ack, int payload1, int payload2) {
1304       long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1305       logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1306       if (!ack) {
1307         synchronized (lock) {
1308           frameWriter.ping(true, payload1, payload2);
1309         }
1310       } else {
1311         Http2Ping p = null;
1312         synchronized (lock) {
1313           if (ping != null) {
1314             if (ping.payload() == ackPayload) {
1315               p = ping;
1316               ping = null;
1317             } else {
1318               log.log(Level.WARNING, String.format(
1319                   Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1320                   ping.payload(), ackPayload));
1321             }
1322           } else {
1323             log.warning("Received unexpected ping ack. No ping outstanding");
1324           }
1325         }
1326         // don't complete it while holding lock since callbacks could run immediately
1327         if (p != null) {
1328           p.complete();
1329         }
1330       }
1331     }
1332 
1333     @Override
ackSettings()1334     public void ackSettings() {
1335       // Do nothing currently.
1336     }
1337 
1338     @Override
goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)1339     public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1340       logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1341       if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1342         String data = debugData.utf8();
1343         log.log(Level.WARNING, String.format(
1344             "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1345         if ("too_many_pings".equals(data)) {
1346           tooManyPingsRunnable.run();
1347         }
1348       }
1349       Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1350           .augmentDescription("Received Goaway");
1351       if (debugData.size() > 0) {
1352         // If a debug message was provided, use it.
1353         status = status.augmentDescription(debugData.utf8());
1354       }
1355       startGoAway(lastGoodStreamId, null, status);
1356     }
1357 
1358     @Override
pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)1359     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1360         throws IOException {
1361       logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1362           streamId, promisedStreamId, requestHeaders);
1363       // We don't accept server initiated stream.
1364       synchronized (lock) {
1365         frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1366       }
1367     }
1368 
1369     @Override
windowUpdate(int streamId, long delta)1370     public void windowUpdate(int streamId, long delta) {
1371       logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1372       if (delta == 0) {
1373         String errorMsg = "Received 0 flow control window increment.";
1374         if (streamId == 0) {
1375           onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
1376         } else {
1377           finishStream(
1378               streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
1379               ErrorCode.PROTOCOL_ERROR, null);
1380         }
1381         return;
1382       }
1383 
1384       boolean unknownStream = false;
1385       synchronized (lock) {
1386         if (streamId == Utils.CONNECTION_STREAM_ID) {
1387           outboundFlow.windowUpdate(null, (int) delta);
1388           return;
1389         }
1390 
1391         OkHttpClientStream stream = streams.get(streamId);
1392         if (stream != null) {
1393           outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1394         } else if (!mayHaveCreatedStream(streamId)) {
1395           unknownStream = true;
1396         }
1397       }
1398       if (unknownStream) {
1399         onError(ErrorCode.PROTOCOL_ERROR,
1400             "Received window_update for unknown stream: " + streamId);
1401       }
1402     }
1403 
1404     @Override
priority(int streamId, int streamDependency, int weight, boolean exclusive)1405     public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1406       // Ignore priority change.
1407       // TODO(madongfly): log
1408     }
1409 
1410     @Override
alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)1411     public void alternateService(int streamId, String origin, ByteString protocol, String host,
1412         int port, long maxAge) {
1413       // TODO(madongfly): Deal with alternateService propagation
1414     }
1415   }
1416 }
1417