• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static com.google.common.base.Preconditions.checkState;
20 import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.MoreObjects;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.base.Supplier;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.SettableFuture;
29 import com.squareup.okhttp.Credentials;
30 import com.squareup.okhttp.HttpUrl;
31 import com.squareup.okhttp.Request;
32 import com.squareup.okhttp.internal.http.StatusLine;
33 import io.grpc.Attributes;
34 import io.grpc.CallOptions;
35 import io.grpc.Grpc;
36 import io.grpc.InternalChannelz;
37 import io.grpc.InternalChannelz.SocketStats;
38 import io.grpc.InternalLogId;
39 import io.grpc.Metadata;
40 import io.grpc.MethodDescriptor;
41 import io.grpc.MethodDescriptor.MethodType;
42 import io.grpc.SecurityLevel;
43 import io.grpc.Status;
44 import io.grpc.Status.Code;
45 import io.grpc.StatusException;
46 import io.grpc.internal.ClientStreamListener.RpcProgress;
47 import io.grpc.internal.ConnectionClientTransport;
48 import io.grpc.internal.GrpcAttributes;
49 import io.grpc.internal.GrpcUtil;
50 import io.grpc.internal.Http2Ping;
51 import io.grpc.internal.KeepAliveManager;
52 import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
53 import io.grpc.internal.ProxyParameters;
54 import io.grpc.internal.SerializingExecutor;
55 import io.grpc.internal.SharedResourceHolder;
56 import io.grpc.internal.StatsTraceContext;
57 import io.grpc.internal.TransportTracer;
58 import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler;
59 import io.grpc.okhttp.internal.ConnectionSpec;
60 import io.grpc.okhttp.internal.framed.ErrorCode;
61 import io.grpc.okhttp.internal.framed.FrameReader;
62 import io.grpc.okhttp.internal.framed.FrameWriter;
63 import io.grpc.okhttp.internal.framed.Header;
64 import io.grpc.okhttp.internal.framed.HeadersMode;
65 import io.grpc.okhttp.internal.framed.Http2;
66 import io.grpc.okhttp.internal.framed.Settings;
67 import io.grpc.okhttp.internal.framed.Variant;
68 import java.io.EOFException;
69 import java.io.IOException;
70 import java.net.InetSocketAddress;
71 import java.net.Socket;
72 import java.net.URI;
73 import java.util.Collections;
74 import java.util.EnumMap;
75 import java.util.HashMap;
76 import java.util.Iterator;
77 import java.util.LinkedList;
78 import java.util.List;
79 import java.util.Map;
80 import java.util.Random;
81 import java.util.concurrent.Executor;
82 import java.util.concurrent.ScheduledExecutorService;
83 import java.util.logging.Level;
84 import java.util.logging.Logger;
85 import javax.annotation.Nullable;
86 import javax.annotation.concurrent.GuardedBy;
87 import javax.net.ssl.HostnameVerifier;
88 import javax.net.ssl.SSLSession;
89 import javax.net.ssl.SSLSocket;
90 import javax.net.ssl.SSLSocketFactory;
91 import okio.Buffer;
92 import okio.BufferedSink;
93 import okio.BufferedSource;
94 import okio.ByteString;
95 import okio.Okio;
96 import okio.Source;
97 import okio.Timeout;
98 
99 /**
100  * A okhttp-based {@link ConnectionClientTransport} implementation.
101  */
102 class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler {
103   private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
104   private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
105   private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0];
106 
buildErrorCodeToStatusMap()107   private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
108     Map<ErrorCode, Status> errorToStatus = new EnumMap<ErrorCode, Status>(ErrorCode.class);
109     errorToStatus.put(ErrorCode.NO_ERROR,
110         Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
111     errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
112         Status.INTERNAL.withDescription("Protocol error"));
113     errorToStatus.put(ErrorCode.INTERNAL_ERROR,
114         Status.INTERNAL.withDescription("Internal error"));
115     errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
116         Status.INTERNAL.withDescription("Flow control error"));
117     errorToStatus.put(ErrorCode.STREAM_CLOSED,
118         Status.INTERNAL.withDescription("Stream closed"));
119     errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
120         Status.INTERNAL.withDescription("Frame too large"));
121     errorToStatus.put(ErrorCode.REFUSED_STREAM,
122         Status.UNAVAILABLE.withDescription("Refused stream"));
123     errorToStatus.put(ErrorCode.CANCEL,
124         Status.CANCELLED.withDescription("Cancelled"));
125     errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
126         Status.INTERNAL.withDescription("Compression error"));
127     errorToStatus.put(ErrorCode.CONNECT_ERROR,
128         Status.INTERNAL.withDescription("Connect error"));
129     errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
130         Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
131     errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
132         Status.PERMISSION_DENIED.withDescription("Inadequate security"));
133     return Collections.unmodifiableMap(errorToStatus);
134   }
135 
136   private final InetSocketAddress address;
137   private final String defaultAuthority;
138   private final String userAgent;
139   private final Random random = new Random();
140   // Returns new unstarted stopwatches
141   private final Supplier<Stopwatch> stopwatchFactory;
142   private Listener listener;
143   private FrameReader testFrameReader;
144   private AsyncFrameWriter frameWriter;
145   private OutboundFlowController outboundFlow;
146   private final Object lock = new Object();
147   private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
148   @GuardedBy("lock")
149   private int nextStreamId;
150   @GuardedBy("lock")
151   private final Map<Integer, OkHttpClientStream> streams =
152       new HashMap<Integer, OkHttpClientStream>();
153   private final Executor executor;
154   // Wrap on executor, to guarantee some operations be executed serially.
155   private final SerializingExecutor serializingExecutor;
156   private final int maxMessageSize;
157   private int connectionUnacknowledgedBytesRead;
158   private ClientFrameHandler clientFrameHandler;
159   // Caution: Not synchronized, new value can only be safely read after the connection is complete.
160   private Attributes attributes = Attributes.EMPTY;
161   /**
162    * Indicates the transport is in go-away state: no new streams will be processed, but existing
163    * streams may continue.
164    */
165   @GuardedBy("lock")
166   private Status goAwayStatus;
167   @GuardedBy("lock")
168   private boolean goAwaySent;
169   @GuardedBy("lock")
170   private Http2Ping ping;
171   @GuardedBy("lock")
172   private boolean stopped;
173   @GuardedBy("lock")
174   private boolean inUse;
175   private SSLSocketFactory sslSocketFactory;
176   private HostnameVerifier hostnameVerifier;
177   private Socket socket;
178   @GuardedBy("lock")
179   private int maxConcurrentStreams = 0;
180   @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
181   @GuardedBy("lock")
182   private LinkedList<OkHttpClientStream> pendingStreams = new LinkedList<OkHttpClientStream>();
183   private final ConnectionSpec connectionSpec;
184   private FrameWriter testFrameWriter;
185   private ScheduledExecutorService scheduler;
186   private KeepAliveManager keepAliveManager;
187   private boolean enableKeepAlive;
188   private long keepAliveTimeNanos;
189   private long keepAliveTimeoutNanos;
190   private boolean keepAliveWithoutCalls;
191   private final Runnable tooManyPingsRunnable;
192   @GuardedBy("lock")
193   private final TransportTracer transportTracer;
194   @GuardedBy("lock")
195   private InternalChannelz.Security securityInfo;
196 
197   @VisibleForTesting
198   @Nullable
199   final ProxyParameters proxy;
200 
201   // The following fields should only be used for test.
202   Runnable connectingCallback;
203   SettableFuture<Void> connectedFuture;
204 
205 
OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent, Executor executor, @Nullable SSLSocketFactory sslSocketFactory, @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec, int maxMessageSize, @Nullable ProxyParameters proxy, Runnable tooManyPingsRunnable, TransportTracer transportTracer)206   OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent,
207       Executor executor, @Nullable SSLSocketFactory sslSocketFactory,
208       @Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec,
209       int maxMessageSize, @Nullable ProxyParameters proxy, Runnable tooManyPingsRunnable,
210       TransportTracer transportTracer) {
211     this.address = Preconditions.checkNotNull(address, "address");
212     this.defaultAuthority = authority;
213     this.maxMessageSize = maxMessageSize;
214     this.executor = Preconditions.checkNotNull(executor, "executor");
215     serializingExecutor = new SerializingExecutor(executor);
216     // Client initiated streams are odd, server initiated ones are even. Server should not need to
217     // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
218     nextStreamId = 3;
219     this.sslSocketFactory = sslSocketFactory;
220     this.hostnameVerifier = hostnameVerifier;
221     this.connectionSpec = Preconditions.checkNotNull(connectionSpec, "connectionSpec");
222     this.stopwatchFactory = GrpcUtil.STOPWATCH_SUPPLIER;
223     this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
224     this.proxy = proxy;
225     this.tooManyPingsRunnable =
226         Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
227     this.transportTracer = Preconditions.checkNotNull(transportTracer);
228     initTransportTracer();
229   }
230 
231   /**
232    * Create a transport connected to a fake peer for test.
233    */
234   @VisibleForTesting
OkHttpClientTransport( String userAgent, Executor executor, FrameReader frameReader, FrameWriter testFrameWriter, int nextStreamId, Socket socket, Supplier<Stopwatch> stopwatchFactory, @Nullable Runnable connectingCallback, SettableFuture<Void> connectedFuture, int maxMessageSize, Runnable tooManyPingsRunnable, TransportTracer transportTracer)235   OkHttpClientTransport(
236       String userAgent,
237       Executor executor,
238       FrameReader frameReader,
239       FrameWriter testFrameWriter,
240       int nextStreamId,
241       Socket socket,
242       Supplier<Stopwatch> stopwatchFactory,
243       @Nullable Runnable connectingCallback,
244       SettableFuture<Void> connectedFuture,
245       int maxMessageSize,
246       Runnable tooManyPingsRunnable,
247       TransportTracer transportTracer) {
248     address = null;
249     this.maxMessageSize = maxMessageSize;
250     defaultAuthority = "notarealauthority:80";
251     this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
252     this.executor = Preconditions.checkNotNull(executor, "executor");
253     serializingExecutor = new SerializingExecutor(executor);
254     this.testFrameReader = Preconditions.checkNotNull(frameReader, "frameReader");
255     this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter, "testFrameWriter");
256     this.socket = Preconditions.checkNotNull(socket, "socket");
257     this.nextStreamId = nextStreamId;
258     this.stopwatchFactory = stopwatchFactory;
259     this.connectionSpec = null;
260     this.connectingCallback = connectingCallback;
261     this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
262     this.proxy = null;
263     this.tooManyPingsRunnable =
264         Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
265     this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
266     initTransportTracer();
267   }
268 
initTransportTracer()269   private void initTransportTracer() {
270     synchronized (lock) { // to make @GuardedBy linter happy
271       transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
272         @Override
273         public TransportTracer.FlowControlWindows read() {
274           synchronized (lock) {
275             long local = -1; // okhttp does not track the local window size
276             long remote = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
277             return new TransportTracer.FlowControlWindows(local, remote);
278           }
279         }
280       });
281     }
282   }
283 
284   /**
285    * Enable keepalive with custom delay and timeout.
286    */
enableKeepAlive(boolean enable, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls)287   void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
288       long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
289     enableKeepAlive = enable;
290     this.keepAliveTimeNanos = keepAliveTimeNanos;
291     this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
292     this.keepAliveWithoutCalls = keepAliveWithoutCalls;
293   }
294 
isForTest()295   private boolean isForTest() {
296     return address == null;
297   }
298 
299   @Override
ping(final PingCallback callback, Executor executor)300   public void ping(final PingCallback callback, Executor executor) {
301     checkState(frameWriter != null);
302     long data = 0;
303     Http2Ping p;
304     boolean writePing;
305     synchronized (lock) {
306       if (stopped) {
307         Http2Ping.notifyFailed(callback, executor, getPingFailure());
308         return;
309       }
310       if (ping != null) {
311         // we only allow one outstanding ping at a time, so just add the callback to
312         // any outstanding operation
313         p = ping;
314         writePing = false;
315       } else {
316         // set outstanding operation and then write the ping after releasing lock
317         data = random.nextLong();
318         Stopwatch stopwatch = stopwatchFactory.get();
319         stopwatch.start();
320         p = ping = new Http2Ping(data, stopwatch);
321         writePing = true;
322         transportTracer.reportKeepAliveSent();
323       }
324     }
325     if (writePing) {
326       frameWriter.ping(false, (int) (data >>> 32), (int) data);
327     }
328     // If transport concurrently failed/stopped since we released the lock above, this could
329     // immediately invoke callback (which we shouldn't do while holding a lock)
330     p.addCallback(callback, executor);
331   }
332 
333   @Override
newStream(final MethodDescriptor<?, ?> method, final Metadata headers, CallOptions callOptions)334   public OkHttpClientStream newStream(final MethodDescriptor<?, ?> method,
335       final Metadata headers, CallOptions callOptions) {
336     Preconditions.checkNotNull(method, "method");
337     Preconditions.checkNotNull(headers, "headers");
338     StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
339     return new OkHttpClientStream(
340         method,
341         headers,
342         frameWriter,
343         OkHttpClientTransport.this,
344         outboundFlow,
345         lock,
346         maxMessageSize,
347         defaultAuthority,
348         userAgent,
349         statsTraceCtx,
350         transportTracer);
351   }
352 
353   @GuardedBy("lock")
streamReadyToStart(OkHttpClientStream clientStream)354   void streamReadyToStart(OkHttpClientStream clientStream) {
355     if (goAwayStatus != null) {
356       clientStream.transportState().transportReportStatus(
357           goAwayStatus, RpcProgress.REFUSED, true, new Metadata());
358     } else if (streams.size() >= maxConcurrentStreams) {
359       pendingStreams.add(clientStream);
360       setInUse();
361     } else {
362       startStream(clientStream);
363     }
364   }
365 
366   @GuardedBy("lock")
startStream(OkHttpClientStream stream)367   private void startStream(OkHttpClientStream stream) {
368     Preconditions.checkState(
369         stream.id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
370     streams.put(nextStreamId, stream);
371     setInUse();
372     stream.transportState().start(nextStreamId);
373     // For unary and server streaming, there will be a data frame soon, no need to flush the header.
374     if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
375         || stream.useGet()) {
376       frameWriter.flush();
377     }
378     if (nextStreamId >= Integer.MAX_VALUE - 2) {
379       // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
380       // correctly.
381       nextStreamId = Integer.MAX_VALUE;
382       startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
383           Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
384     } else {
385       nextStreamId += 2;
386     }
387   }
388 
389   /**
390    * Starts pending streams, returns true if at least one pending stream is started.
391    */
392   @GuardedBy("lock")
startPendingStreams()393   private boolean startPendingStreams() {
394     boolean hasStreamStarted = false;
395     while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
396       OkHttpClientStream stream = pendingStreams.poll();
397       startStream(stream);
398       hasStreamStarted = true;
399     }
400     return hasStreamStarted;
401   }
402 
403   /**
404    * Removes given pending stream, used when a pending stream is cancelled.
405    */
406   @GuardedBy("lock")
removePendingStream(OkHttpClientStream pendingStream)407   void removePendingStream(OkHttpClientStream pendingStream) {
408     pendingStreams.remove(pendingStream);
409     maybeClearInUse();
410   }
411 
412   @Override
start(Listener listener)413   public Runnable start(Listener listener) {
414     this.listener = Preconditions.checkNotNull(listener, "listener");
415 
416     if (enableKeepAlive) {
417       scheduler = SharedResourceHolder.get(TIMER_SERVICE);
418       keepAliveManager = new KeepAliveManager(
419           new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
420           keepAliveWithoutCalls);
421       keepAliveManager.onTransportStarted();
422     }
423 
424     frameWriter = new AsyncFrameWriter(this, serializingExecutor);
425     outboundFlow = new OutboundFlowController(this, frameWriter);
426     // Connecting in the serializingExecutor, so that some stream operations like synStream
427     // will be executed after connected.
428     serializingExecutor.execute(new Runnable() {
429       @Override
430       public void run() {
431         if (isForTest()) {
432           if (connectingCallback != null) {
433             connectingCallback.run();
434           }
435           clientFrameHandler = new ClientFrameHandler(testFrameReader);
436           executor.execute(clientFrameHandler);
437           synchronized (lock) {
438             maxConcurrentStreams = Integer.MAX_VALUE;
439             startPendingStreams();
440           }
441           frameWriter.becomeConnected(testFrameWriter, socket);
442           connectedFuture.set(null);
443           return;
444         }
445 
446         // Use closed source on failure so that the reader immediately shuts down.
447         BufferedSource source = Okio.buffer(new Source() {
448           @Override
449           public long read(Buffer sink, long byteCount) {
450             return -1;
451           }
452 
453           @Override
454           public Timeout timeout() {
455             return Timeout.NONE;
456           }
457 
458           @Override
459           public void close() {}
460         });
461         Variant variant = new Http2();
462         BufferedSink sink;
463         Socket sock;
464         SSLSession sslSession = null;
465         try {
466           if (proxy == null) {
467             sock = new Socket(address.getAddress(), address.getPort());
468           } else {
469             sock = createHttpProxySocket(
470                 address, proxy.proxyAddress, proxy.username, proxy.password);
471           }
472 
473           if (sslSocketFactory != null) {
474             SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
475                 sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
476                 connectionSpec);
477             sslSession = sslSocket.getSession();
478             sock = sslSocket;
479           }
480           sock.setTcpNoDelay(true);
481           source = Okio.buffer(Okio.source(sock));
482           sink = Okio.buffer(Okio.sink(sock));
483           // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
484           attributes = Attributes
485               .newBuilder()
486               .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
487               .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
488               .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
489               .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
490                   sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
491               .build();
492         } catch (StatusException e) {
493           startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
494           return;
495         } catch (Exception e) {
496           onException(e);
497           return;
498         } finally {
499           clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
500           executor.execute(clientFrameHandler);
501         }
502 
503         FrameWriter rawFrameWriter;
504         synchronized (lock) {
505           socket = Preconditions.checkNotNull(sock, "socket");
506           maxConcurrentStreams = Integer.MAX_VALUE;
507           startPendingStreams();
508           if (sslSession != null) {
509             securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
510           }
511         }
512 
513         rawFrameWriter = variant.newWriter(sink, true);
514         frameWriter.becomeConnected(rawFrameWriter, socket);
515 
516         try {
517           // Do these with the raw FrameWriter, so that they will be done in this thread,
518           // and before any possible pending stream operations.
519           rawFrameWriter.connectionPreface();
520           Settings settings = new Settings();
521           rawFrameWriter.settings(settings);
522         } catch (Exception e) {
523           onException(e);
524           return;
525         }
526       }
527     });
528     return null;
529   }
530 
createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyUsername, String proxyPassword)531   private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
532       String proxyUsername, String proxyPassword) throws IOException, StatusException {
533     try {
534       Socket sock;
535       // The proxy address may not be resolved
536       if (proxyAddress.getAddress() != null) {
537         sock = new Socket(proxyAddress.getAddress(), proxyAddress.getPort());
538       } else {
539         sock = new Socket(proxyAddress.getHostName(), proxyAddress.getPort());
540       }
541       sock.setTcpNoDelay(true);
542 
543       Source source = Okio.source(sock);
544       BufferedSink sink = Okio.buffer(Okio.sink(sock));
545 
546       // Prepare headers and request method line
547       Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
548       HttpUrl url = proxyRequest.httpUrl();
549       String requestLine = String.format("CONNECT %s:%d HTTP/1.1", url.host(), url.port());
550 
551       // Write request to socket
552       sink.writeUtf8(requestLine).writeUtf8("\r\n");
553       for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
554         sink.writeUtf8(proxyRequest.headers().name(i))
555             .writeUtf8(": ")
556             .writeUtf8(proxyRequest.headers().value(i))
557             .writeUtf8("\r\n");
558       }
559       sink.writeUtf8("\r\n");
560       // Flush buffer (flushes socket and sends request)
561       sink.flush();
562 
563       // Read status line, check if 2xx was returned
564       StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
565       // Drain rest of headers
566       while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
567       if (statusLine.code < 200 || statusLine.code >= 300) {
568         Buffer body = new Buffer();
569         try {
570           sock.shutdownOutput();
571           source.read(body, 1024);
572         } catch (IOException ex) {
573           body.writeUtf8("Unable to read body: " + ex.toString());
574         }
575         try {
576           sock.close();
577         } catch (IOException ignored) {
578           // ignored
579         }
580         String message = String.format(
581             "Response returned from proxy was not successful (expected 2xx, got %d %s). "
582               + "Response body:\n%s",
583             statusLine.code, statusLine.message, body.readUtf8());
584         throw Status.UNAVAILABLE.withDescription(message).asException();
585       }
586       return sock;
587     } catch (IOException e) {
588       throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
589           .asException();
590     }
591   }
592 
createHttpProxyRequest(InetSocketAddress address, String proxyUsername, String proxyPassword)593   private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
594       String proxyPassword) {
595     HttpUrl tunnelUrl = new HttpUrl.Builder()
596         .scheme("https")
597         .host(address.getHostName())
598         .port(address.getPort())
599         .build();
600     Request.Builder request = new Request.Builder()
601         .url(tunnelUrl)
602         .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
603         .header("User-Agent", userAgent);
604 
605     // If we have proxy credentials, set them right away
606     if (proxyUsername != null && proxyPassword != null) {
607       request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
608     }
609     return request.build();
610   }
611 
readUtf8LineStrictUnbuffered(Source source)612   private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
613     Buffer buffer = new Buffer();
614     while (true) {
615       if (source.read(buffer, 1) == -1) {
616         throw new EOFException("\\n not found: " + buffer.readByteString().hex());
617       }
618       if (buffer.getByte(buffer.size() - 1) == '\n') {
619         return buffer.readUtf8LineStrict();
620       }
621     }
622   }
623 
624   @Override
toString()625   public String toString() {
626     return MoreObjects.toStringHelper(this)
627         .add("logId", logId.getId())
628         .add("address", address)
629         .toString();
630   }
631 
632   @Override
getLogId()633   public InternalLogId getLogId() {
634     return logId;
635   }
636 
637   /**
638    * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
639    * authority, uri.getHost() will (rightly) return null, since the authority is no longer
640    * an actual service.  This method overrides the behavior for practical reasons.  For example,
641    * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
642    * we return the input.  This is because the return value, in conjunction with getOverridenPort,
643    * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
644    * connection to the port, independent of this function.
645    *
646    * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
647    * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
648    * authorities do not have ports, so this is even more wrong than before.  Sorry.
649    */
650   @VisibleForTesting
getOverridenHost()651   String getOverridenHost() {
652     URI uri = GrpcUtil.authorityToUri(defaultAuthority);
653     if (uri.getHost() != null) {
654       return uri.getHost();
655     }
656 
657     return defaultAuthority;
658   }
659 
660   @VisibleForTesting
getOverridenPort()661   int getOverridenPort() {
662     URI uri = GrpcUtil.authorityToUri(defaultAuthority);
663     if (uri.getPort() != -1) {
664       return uri.getPort();
665     }
666 
667     return address.getPort();
668   }
669 
670   @Override
shutdown(Status reason)671   public void shutdown(Status reason) {
672     synchronized (lock) {
673       if (goAwayStatus != null) {
674         return;
675       }
676 
677       goAwayStatus = reason;
678       listener.transportShutdown(goAwayStatus);
679       stopIfNecessary();
680     }
681   }
682 
683   @Override
shutdownNow(Status reason)684   public void shutdownNow(Status reason) {
685     shutdown(reason);
686     synchronized (lock) {
687       Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
688       while (it.hasNext()) {
689         Map.Entry<Integer, OkHttpClientStream> entry = it.next();
690         it.remove();
691         entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
692       }
693 
694       for (OkHttpClientStream stream : pendingStreams) {
695         stream.transportState().transportReportStatus(reason, true, new Metadata());
696       }
697       pendingStreams.clear();
698       maybeClearInUse();
699 
700       stopIfNecessary();
701     }
702   }
703 
704   @Override
getAttributes()705   public Attributes getAttributes() {
706     return attributes;
707   }
708 
709   /**
710    * Gets all active streams as an array.
711    */
getActiveStreams()712   OkHttpClientStream[] getActiveStreams() {
713     synchronized (lock) {
714       return streams.values().toArray(EMPTY_STREAM_ARRAY);
715     }
716   }
717 
718   @VisibleForTesting
getHandler()719   ClientFrameHandler getHandler() {
720     return clientFrameHandler;
721   }
722 
723   @VisibleForTesting
getPendingStreamSize()724   int getPendingStreamSize() {
725     synchronized (lock) {
726       return pendingStreams.size();
727     }
728   }
729 
730   /**
731    * Finish all active streams due to an IOException, then close the transport.
732    */
733   @Override
onException(Throwable failureCause)734   public void onException(Throwable failureCause) {
735     Preconditions.checkNotNull(failureCause, "failureCause");
736     Status status = Status.UNAVAILABLE.withCause(failureCause);
737     startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
738   }
739 
740   /**
741    * Send GOAWAY to the server, then finish all active streams and close the transport.
742    */
onError(ErrorCode errorCode, String moreDetail)743   private void onError(ErrorCode errorCode, String moreDetail) {
744     startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
745   }
746 
startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status)747   private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
748     synchronized (lock) {
749       if (goAwayStatus == null) {
750         goAwayStatus = status;
751         listener.transportShutdown(status);
752       }
753       if (errorCode != null && !goAwaySent) {
754         // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
755         // streams. The GOAWAY is part of graceful shutdown.
756         goAwaySent = true;
757         frameWriter.goAway(0, errorCode, new byte[0]);
758       }
759 
760       Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
761       while (it.hasNext()) {
762         Map.Entry<Integer, OkHttpClientStream> entry = it.next();
763         if (entry.getKey() > lastKnownStreamId) {
764           it.remove();
765           entry.getValue().transportState().transportReportStatus(
766               status, RpcProgress.REFUSED, false, new Metadata());
767         }
768       }
769 
770       for (OkHttpClientStream stream : pendingStreams) {
771         stream.transportState().transportReportStatus(
772             status, RpcProgress.REFUSED, true, new Metadata());
773       }
774       pendingStreams.clear();
775       maybeClearInUse();
776 
777       stopIfNecessary();
778     }
779   }
780 
781   /**
782    * Called when a stream is closed, we do things like:
783    * <ul>
784    * <li>Removing the stream from the map.
785    * <li>Optionally reporting the status.
786    * <li>Starting pending streams if we can.
787    * <li>Stopping the transport if this is the last live stream under a go-away status.
788    * </ul>
789    *
790    * @param streamId the Id of the stream.
791    * @param status the final status of this stream, null means no need to report.
792    * @param stopDelivery interrupt queued messages in the deframer
793    * @param errorCode reset the stream with this ErrorCode if not null.
794    * @param trailers the trailers received if not null
795    */
finishStream( int streamId, @Nullable Status status, RpcProgress rpcProgress, boolean stopDelivery, @Nullable ErrorCode errorCode, @Nullable Metadata trailers)796   void finishStream(
797       int streamId,
798       @Nullable Status status,
799       RpcProgress rpcProgress,
800       boolean stopDelivery,
801       @Nullable ErrorCode errorCode,
802       @Nullable Metadata trailers) {
803     synchronized (lock) {
804       OkHttpClientStream stream = streams.remove(streamId);
805       if (stream != null) {
806         if (errorCode != null) {
807           frameWriter.rstStream(streamId, ErrorCode.CANCEL);
808         }
809         if (status != null) {
810           stream
811               .transportState()
812               .transportReportStatus(
813                   status,
814                   rpcProgress,
815                   stopDelivery,
816                   trailers != null ? trailers : new Metadata());
817         }
818         if (!startPendingStreams()) {
819           stopIfNecessary();
820           maybeClearInUse();
821         }
822       }
823     }
824   }
825 
826   /**
827    * When the transport is in goAway state, we should stop it once all active streams finish.
828    */
829   @GuardedBy("lock")
stopIfNecessary()830   private void stopIfNecessary() {
831     if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
832       return;
833     }
834     if (stopped) {
835       return;
836     }
837     stopped = true;
838 
839     if (keepAliveManager != null) {
840       keepAliveManager.onTransportTermination();
841       // KeepAliveManager should stop using the scheduler after onTransportTermination gets called.
842       scheduler = SharedResourceHolder.release(TIMER_SERVICE, scheduler);
843     }
844 
845     if (ping != null) {
846       ping.failed(getPingFailure());
847       ping = null;
848     }
849 
850     if (!goAwaySent) {
851       // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
852       // streams. The GOAWAY is part of graceful shutdown.
853       goAwaySent = true;
854       frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
855     }
856 
857     // We will close the underlying socket in the writing thread to break out the reader
858     // thread, which will close the frameReader and notify the listener.
859     frameWriter.close();
860   }
861 
862   @GuardedBy("lock")
maybeClearInUse()863   private void maybeClearInUse() {
864     if (inUse) {
865       if (pendingStreams.isEmpty() && streams.isEmpty()) {
866         inUse = false;
867         listener.transportInUse(false);
868         if (keepAliveManager != null) {
869           // We don't have any active streams. No need to do keepalives any more.
870           // Again, we have to call this inside the lock to avoid the race between onTransportIdle
871           // and onTransportActive.
872           keepAliveManager.onTransportIdle();
873         }
874       }
875     }
876   }
877 
878   @GuardedBy("lock")
setInUse()879   private void setInUse() {
880     if (!inUse) {
881       inUse = true;
882       listener.transportInUse(true);
883       if (keepAliveManager != null) {
884         // We have a new stream. We might need to do keepalives now.
885         // Note that we have to do this inside the lock to avoid calling
886         // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
887         // order.
888         keepAliveManager.onTransportActive();
889       }
890     }
891   }
892 
getPingFailure()893   private Throwable getPingFailure() {
894     synchronized (lock) {
895       if (goAwayStatus != null) {
896         return goAwayStatus.asException();
897       } else {
898         return Status.UNAVAILABLE.withDescription("Connection closed").asException();
899       }
900     }
901   }
902 
mayHaveCreatedStream(int streamId)903   boolean mayHaveCreatedStream(int streamId) {
904     synchronized (lock) {
905       return streamId < nextStreamId && (streamId & 1) == 1;
906     }
907   }
908 
getStream(int streamId)909   OkHttpClientStream getStream(int streamId) {
910     synchronized (lock) {
911       return streams.get(streamId);
912     }
913   }
914 
915   /**
916    * Returns a Grpc status corresponding to the given ErrorCode.
917    */
918   @VisibleForTesting
toGrpcStatus(ErrorCode code)919   static Status toGrpcStatus(ErrorCode code) {
920     Status status = ERROR_CODE_TO_STATUS.get(code);
921     return status != null ? status : Status.UNKNOWN.withDescription(
922         "Unknown http2 error code: " + code.httpCode);
923   }
924 
925   @Override
getStats()926   public ListenableFuture<SocketStats> getStats() {
927     SettableFuture<SocketStats> ret = SettableFuture.create();
928     synchronized (lock) {
929       if (socket == null) {
930         ret.set(new SocketStats(
931             transportTracer.getStats(),
932             /*local=*/ null,
933             /*remote=*/ null,
934             new InternalChannelz.SocketOptions.Builder().build(),
935             /*security=*/ null));
936       } else {
937         ret.set(new SocketStats(
938             transportTracer.getStats(),
939             socket.getLocalSocketAddress(),
940             socket.getRemoteSocketAddress(),
941             Utils.getSocketOptions(socket),
942             securityInfo));
943       }
944       return ret;
945     }
946   }
947 
948   /**
949    * Runnable which reads frames and dispatches them to in flight calls.
950    */
951   @VisibleForTesting
952   class ClientFrameHandler implements FrameReader.Handler, Runnable {
953     FrameReader frameReader;
954     boolean firstSettings = true;
955 
ClientFrameHandler(FrameReader frameReader)956     ClientFrameHandler(FrameReader frameReader) {
957       this.frameReader = frameReader;
958     }
959 
960     @Override
run()961     public void run() {
962       String threadName = Thread.currentThread().getName();
963       if (!GrpcUtil.IS_RESTRICTED_APPENGINE) {
964         Thread.currentThread().setName("OkHttpClientTransport");
965       }
966       try {
967         // Read until the underlying socket closes.
968         while (frameReader.nextFrame(this)) {
969           if (keepAliveManager != null) {
970             keepAliveManager.onDataReceived();
971           }
972         }
973         // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
974         // it may be triggered by the socket closing, in such case, the startGoAway() will do
975         // nothing, otherwise, we finish all streams since it's a real IO issue.
976         startGoAway(0, ErrorCode.INTERNAL_ERROR,
977             Status.UNAVAILABLE.withDescription("End of stream or IOException"));
978       } catch (Throwable t) {
979         // TODO(madongfly): Send the exception message to the server.
980         startGoAway(
981             0,
982             ErrorCode.PROTOCOL_ERROR,
983             Status.UNAVAILABLE.withDescription("error in frame handler").withCause(t));
984       } finally {
985         try {
986           frameReader.close();
987         } catch (IOException ex) {
988           log.log(Level.INFO, "Exception closing frame reader", ex);
989         }
990         listener.transportTerminated();
991         if (!GrpcUtil.IS_RESTRICTED_APPENGINE) {
992           // Restore the original thread name.
993           Thread.currentThread().setName(threadName);
994         }
995       }
996     }
997 
998     /**
999      * Handle a HTTP2 DATA frame.
1000      */
1001     @Override
data(boolean inFinished, int streamId, BufferedSource in, int length)1002     public void data(boolean inFinished, int streamId, BufferedSource in, int length)
1003         throws IOException {
1004       OkHttpClientStream stream = getStream(streamId);
1005       if (stream == null) {
1006         if (mayHaveCreatedStream(streamId)) {
1007           frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
1008           in.skip(length);
1009         } else {
1010           onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1011           return;
1012         }
1013       } else {
1014         // Wait until the frame is complete.
1015         in.require(length);
1016 
1017         Buffer buf = new Buffer();
1018         buf.write(in.buffer(), length);
1019         synchronized (lock) {
1020           stream.transportState().transportDataReceived(buf, inFinished);
1021         }
1022       }
1023 
1024       // connection window update
1025       connectionUnacknowledgedBytesRead += length;
1026       if (connectionUnacknowledgedBytesRead >= Utils.DEFAULT_WINDOW_SIZE / 2) {
1027         frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1028         connectionUnacknowledgedBytesRead = 0;
1029       }
1030     }
1031 
1032     /**
1033      * Handle HTTP2 HEADER and CONTINUATION frames.
1034      */
1035     @Override
headers(boolean outFinished, boolean inFinished, int streamId, int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode)1036     public void headers(boolean outFinished,
1037         boolean inFinished,
1038         int streamId,
1039         int associatedStreamId,
1040         List<Header> headerBlock,
1041         HeadersMode headersMode) {
1042       boolean unknownStream = false;
1043       synchronized (lock) {
1044         OkHttpClientStream stream = streams.get(streamId);
1045         if (stream == null) {
1046           if (mayHaveCreatedStream(streamId)) {
1047             frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
1048           } else {
1049             unknownStream = true;
1050           }
1051         } else {
1052           stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1053         }
1054       }
1055       if (unknownStream) {
1056         // We don't expect any server-initiated streams.
1057         onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1058       }
1059     }
1060 
1061     @Override
rstStream(int streamId, ErrorCode errorCode)1062     public void rstStream(int streamId, ErrorCode errorCode) {
1063       Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1064       boolean stopDelivery =
1065           (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1066       finishStream(
1067           streamId, status,
1068           errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1069           stopDelivery, null, null);
1070     }
1071 
1072     @Override
settings(boolean clearPrevious, Settings settings)1073     public void settings(boolean clearPrevious, Settings settings) {
1074       boolean outboundWindowSizeIncreased = false;
1075       synchronized (lock) {
1076         if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1077           int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1078               settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1079           maxConcurrentStreams = receivedMaxConcurrentStreams;
1080         }
1081 
1082         if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1083           int initialWindowSize = OkHttpSettingsUtil.get(
1084               settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1085           outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1086         }
1087         if (firstSettings) {
1088           listener.transportReady();
1089           firstSettings = false;
1090         }
1091 
1092         // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1093         // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1094         // otherwise it will cause a stream error (RST_STREAM).
1095         frameWriter.ackSettings(settings);
1096 
1097         // send any pending bytes / streams
1098         if (outboundWindowSizeIncreased) {
1099           outboundFlow.writeStreams();
1100         }
1101         startPendingStreams();
1102       }
1103     }
1104 
1105     @Override
ping(boolean ack, int payload1, int payload2)1106     public void ping(boolean ack, int payload1, int payload2) {
1107       if (!ack) {
1108         frameWriter.ping(true, payload1, payload2);
1109       } else {
1110         Http2Ping p = null;
1111         long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1112         synchronized (lock) {
1113           if (ping != null) {
1114             if (ping.payload() == ackPayload) {
1115               p = ping;
1116               ping = null;
1117             } else {
1118               log.log(Level.WARNING, String.format("Received unexpected ping ack. "
1119                   + "Expecting %d, got %d", ping.payload(), ackPayload));
1120             }
1121           } else {
1122             log.warning("Received unexpected ping ack. No ping outstanding");
1123           }
1124         }
1125         // don't complete it while holding lock since callbacks could run immediately
1126         if (p != null) {
1127           p.complete();
1128         }
1129       }
1130     }
1131 
1132     @Override
ackSettings()1133     public void ackSettings() {
1134       // Do nothing currently.
1135     }
1136 
1137     @Override
goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData)1138     public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1139       if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1140         String data = debugData.utf8();
1141         log.log(Level.WARNING, String.format(
1142             "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1143         if ("too_many_pings".equals(data)) {
1144           tooManyPingsRunnable.run();
1145         }
1146       }
1147       Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1148           .augmentDescription("Received Goaway");
1149       if (debugData.size() > 0) {
1150         // If a debug message was provided, use it.
1151         status = status.augmentDescription(debugData.utf8());
1152       }
1153       startGoAway(lastGoodStreamId, null, status);
1154     }
1155 
1156     @Override
pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)1157     public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1158         throws IOException {
1159       // We don't accept server initiated stream.
1160       frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1161     }
1162 
1163     @Override
windowUpdate(int streamId, long delta)1164     public void windowUpdate(int streamId, long delta) {
1165       if (delta == 0) {
1166         String errorMsg = "Received 0 flow control window increment.";
1167         if (streamId == 0) {
1168           onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
1169         } else {
1170           finishStream(
1171               streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
1172               ErrorCode.PROTOCOL_ERROR, null);
1173         }
1174         return;
1175       }
1176 
1177       boolean unknownStream = false;
1178       synchronized (lock) {
1179         if (streamId == Utils.CONNECTION_STREAM_ID) {
1180           outboundFlow.windowUpdate(null, (int) delta);
1181           return;
1182         }
1183 
1184         OkHttpClientStream stream = streams.get(streamId);
1185         if (stream != null) {
1186           outboundFlow.windowUpdate(stream, (int) delta);
1187         } else if (!mayHaveCreatedStream(streamId)) {
1188           unknownStream = true;
1189         }
1190       }
1191       if (unknownStream) {
1192         onError(ErrorCode.PROTOCOL_ERROR,
1193             "Received window_update for unknown stream: " + streamId);
1194       }
1195     }
1196 
1197     @Override
priority(int streamId, int streamDependency, int weight, boolean exclusive)1198     public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1199       // Ignore priority change.
1200       // TODO(madongfly): log
1201     }
1202 
1203     @Override
alternateService(int streamId, String origin, ByteString protocol, String host, int port, long maxAge)1204     public void alternateService(int streamId, String origin, ByteString protocol, String host,
1205         int port, long maxAge) {
1206       // TODO(madongfly): Deal with alternateService propagation
1207     }
1208   }
1209 }
1210