• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import static com.google.common.base.Charsets.UTF_8;
20 import static com.google.common.truth.Truth.assertThat;
21 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
22 import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
23 import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
24 import static io.grpc.okhttp.Headers.CONTENT_TYPE_HEADER;
25 import static io.grpc.okhttp.Headers.METHOD_HEADER;
26 import static io.grpc.okhttp.Headers.SCHEME_HEADER;
27 import static io.grpc.okhttp.Headers.TE_HEADER;
28 import static org.junit.Assert.assertEquals;
29 import static org.junit.Assert.assertFalse;
30 import static org.junit.Assert.assertNotNull;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.junit.Assert.fail;
35 import static org.mockito.Matchers.any;
36 import static org.mockito.Matchers.anyBoolean;
37 import static org.mockito.Matchers.anyInt;
38 import static org.mockito.Matchers.eq;
39 import static org.mockito.Matchers.isA;
40 import static org.mockito.Matchers.same;
41 import static org.mockito.Mockito.inOrder;
42 import static org.mockito.Mockito.mock;
43 import static org.mockito.Mockito.never;
44 import static org.mockito.Mockito.reset;
45 import static org.mockito.Mockito.timeout;
46 import static org.mockito.Mockito.verify;
47 import static org.mockito.Mockito.verifyNoMoreInteractions;
48 import static org.mockito.Mockito.verifyZeroInteractions;
49 import static org.mockito.Mockito.when;
50 
51 import com.google.common.base.Stopwatch;
52 import com.google.common.base.Supplier;
53 import com.google.common.base.Ticker;
54 import com.google.common.collect.ImmutableList;
55 import com.google.common.util.concurrent.Futures;
56 import com.google.common.util.concurrent.MoreExecutors;
57 import com.google.common.util.concurrent.SettableFuture;
58 import io.grpc.CallOptions;
59 import io.grpc.InternalChannelz.SocketStats;
60 import io.grpc.InternalChannelz.TransportStats;
61 import io.grpc.InternalInstrumented;
62 import io.grpc.InternalStatus;
63 import io.grpc.Metadata;
64 import io.grpc.MethodDescriptor;
65 import io.grpc.MethodDescriptor.MethodType;
66 import io.grpc.Status;
67 import io.grpc.Status.Code;
68 import io.grpc.StatusException;
69 import io.grpc.internal.AbstractStream;
70 import io.grpc.internal.ClientStreamListener;
71 import io.grpc.internal.ClientTransport;
72 import io.grpc.internal.GrpcUtil;
73 import io.grpc.internal.ManagedClientTransport;
74 import io.grpc.internal.ProxyParameters;
75 import io.grpc.internal.TransportTracer;
76 import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler;
77 import io.grpc.okhttp.internal.ConnectionSpec;
78 import io.grpc.okhttp.internal.framed.ErrorCode;
79 import io.grpc.okhttp.internal.framed.FrameReader;
80 import io.grpc.okhttp.internal.framed.FrameWriter;
81 import io.grpc.okhttp.internal.framed.Header;
82 import io.grpc.okhttp.internal.framed.HeadersMode;
83 import io.grpc.okhttp.internal.framed.Settings;
84 import io.grpc.testing.TestMethodDescriptors;
85 import java.io.BufferedReader;
86 import java.io.ByteArrayInputStream;
87 import java.io.IOException;
88 import java.io.InputStream;
89 import java.io.InputStreamReader;
90 import java.net.InetSocketAddress;
91 import java.net.ServerSocket;
92 import java.net.Socket;
93 import java.net.SocketAddress;
94 import java.util.ArrayList;
95 import java.util.Arrays;
96 import java.util.List;
97 import java.util.concurrent.CountDownLatch;
98 import java.util.concurrent.ExecutionException;
99 import java.util.concurrent.ExecutorService;
100 import java.util.concurrent.Executors;
101 import java.util.concurrent.LinkedBlockingQueue;
102 import java.util.concurrent.TimeUnit;
103 import java.util.concurrent.atomic.AtomicBoolean;
104 import javax.annotation.Nullable;
105 import javax.net.ssl.HostnameVerifier;
106 import javax.net.ssl.SSLSocketFactory;
107 import okio.Buffer;
108 import okio.ByteString;
109 import org.junit.After;
110 import org.junit.Before;
111 import org.junit.Rule;
112 import org.junit.Test;
113 import org.junit.rules.Timeout;
114 import org.junit.runner.RunWith;
115 import org.junit.runners.JUnit4;
116 import org.mockito.ArgumentCaptor;
117 import org.mockito.InOrder;
118 import org.mockito.Matchers;
119 import org.mockito.Mock;
120 import org.mockito.MockitoAnnotations;
121 
122 /**
123  * Tests for {@link OkHttpClientTransport}.
124  */
125 @RunWith(JUnit4.class)
126 public class OkHttpClientTransportTest {
127   private static final int TIME_OUT_MS = 2000;
128   private static final String NETWORK_ISSUE_MESSAGE = "network issue";
129   private static final String ERROR_MESSAGE = "simulated error";
130   // The gRPC header length, which includes 1 byte compression flag and 4 bytes message length.
131   private static final int HEADER_LENGTH = 5;
132   private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test");
133   private static final ProxyParameters NO_PROXY = null;
134   private static final String NO_USER = null;
135   private static final String NO_PW = null;
136   private static final int DEFAULT_START_STREAM_ID = 3;
137 
138   @Rule public final Timeout globalTimeout = Timeout.seconds(10);
139 
140   @Mock
141   private FrameWriter frameWriter;
142 
143   private MethodDescriptor<Void, Void> method = TestMethodDescriptors.voidMethod();
144 
145   @Mock
146   private ManagedClientTransport.Listener transportListener;
147 
148   private final SSLSocketFactory sslSocketFactory = null;
149   private final HostnameVerifier hostnameVerifier = null;
150   private final TransportTracer transportTracer = new TransportTracer();
151   private OkHttpClientTransport clientTransport;
152   private MockFrameReader frameReader;
153   private ExecutorService executor = Executors.newCachedThreadPool();
154   private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
155   private SettableFuture<Void> connectedFuture;
156   private DelayConnectedCallback delayConnectedCallback;
157   private Runnable tooManyPingsRunnable = new Runnable() {
158     @Override public void run() {
159       throw new AssertionError();
160     }
161   };
162 
163   /** Set up for test. */
164   @Before
setUp()165   public void setUp() {
166     MockitoAnnotations.initMocks(this);
167     when(frameWriter.maxDataLength()).thenReturn(Integer.MAX_VALUE);
168     frameReader = new MockFrameReader();
169   }
170 
171   @After
tearDown()172   public void tearDown() {
173     executor.shutdownNow();
174   }
175 
initTransport()176   private void initTransport() throws Exception {
177     startTransport(DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, null);
178   }
179 
initTransport(int startId)180   private void initTransport(int startId) throws Exception {
181     startTransport(startId, null, true, DEFAULT_MAX_MESSAGE_SIZE, null);
182   }
183 
initTransportAndDelayConnected()184   private void initTransportAndDelayConnected() throws Exception {
185     delayConnectedCallback = new DelayConnectedCallback();
186     startTransport(
187         DEFAULT_START_STREAM_ID, delayConnectedCallback, false, DEFAULT_MAX_MESSAGE_SIZE, null);
188   }
189 
startTransport(int startId, @Nullable Runnable connectingCallback, boolean waitingForConnected, int maxMessageSize, String userAgent)190   private void startTransport(int startId, @Nullable Runnable connectingCallback,
191       boolean waitingForConnected, int maxMessageSize, String userAgent) throws Exception {
192     connectedFuture = SettableFuture.create();
193     final Ticker ticker = new Ticker() {
194       @Override
195       public long read() {
196         return nanoTime;
197       }
198     };
199     Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() {
200       @Override
201       public Stopwatch get() {
202         return Stopwatch.createUnstarted(ticker);
203       }
204     };
205     clientTransport = new OkHttpClientTransport(
206         userAgent,
207         executor,
208         frameReader,
209         frameWriter,
210         startId,
211         new MockSocket(frameReader),
212         stopwatchSupplier,
213         connectingCallback,
214         connectedFuture,
215         maxMessageSize,
216         tooManyPingsRunnable,
217         new TransportTracer());
218     clientTransport.start(transportListener);
219     if (waitingForConnected) {
220       connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS);
221     }
222   }
223 
224   @Test
testToString()225   public void testToString() throws Exception {
226     InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
227     clientTransport = new OkHttpClientTransport(
228         address,
229         "hostname",
230         /*agent=*/ null,
231         executor,
232         sslSocketFactory,
233         hostnameVerifier,
234         OkHttpChannelBuilder.INTERNAL_DEFAULT_CONNECTION_SPEC,
235         DEFAULT_MAX_MESSAGE_SIZE,
236         NO_PROXY,
237         tooManyPingsRunnable,
238         transportTracer);
239     String s = clientTransport.toString();
240     assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport"));
241     assertTrue("Unexpected: " + s, s.contains(address.toString()));
242   }
243 
244   @Test
maxMessageSizeShouldBeEnforced()245   public void maxMessageSizeShouldBeEnforced() throws Exception {
246     // Allow the response payloads of up to 1 byte.
247     startTransport(3, null, true, 1, null);
248 
249     MockStreamListener listener = new MockStreamListener();
250     OkHttpClientStream stream =
251         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
252     stream.start(listener);
253     stream.request(1);
254     assertContainStream(3);
255     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
256     assertNotNull(listener.headers);
257 
258     // Receive the message.
259     final String message = "Hello Client";
260     Buffer buffer = createMessageFrame(message);
261     frameHandler().data(false, 3, buffer, (int) buffer.size());
262 
263     listener.waitUntilStreamClosed();
264     assertEquals(Code.RESOURCE_EXHAUSTED, listener.status.getCode());
265     shutdownAndVerify();
266   }
267 
268   /**
269    * When nextFrame throws IOException, the transport should be aborted.
270    */
271   @Test
nextFrameThrowIoException()272   public void nextFrameThrowIoException() throws Exception {
273     initTransport();
274     MockStreamListener listener1 = new MockStreamListener();
275     MockStreamListener listener2 = new MockStreamListener();
276     OkHttpClientStream stream1 =
277         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
278     stream1.start(listener1);
279     stream1.request(1);
280     OkHttpClientStream stream2 =
281         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
282     stream2.start(listener2);
283     stream2.request(1);
284     assertEquals(2, activeStreamCount());
285     assertContainStream(3);
286     assertContainStream(5);
287     frameReader.throwIoExceptionForNextFrame();
288     listener1.waitUntilStreamClosed();
289     listener2.waitUntilStreamClosed();
290 
291     assertEquals(0, activeStreamCount());
292     assertEquals(Status.UNAVAILABLE.getCode(), listener1.status.getCode());
293     assertEquals(NETWORK_ISSUE_MESSAGE, listener1.status.getCause().getMessage());
294     assertEquals(Status.UNAVAILABLE.getCode(), listener2.status.getCode());
295     assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage());
296     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
297     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
298     shutdownAndVerify();
299   }
300 
301   /**
302    * Test that even if an Error is thrown from the reading loop of the transport,
303    * it can still clean up and call transportShutdown() and transportTerminated() as expected
304    * by the channel.
305    */
306   @Test
nextFrameThrowsError()307   public void nextFrameThrowsError() throws Exception {
308     initTransport();
309     MockStreamListener listener = new MockStreamListener();
310     OkHttpClientStream stream =
311         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
312     stream.start(listener);
313     stream.request(1);
314     assertEquals(1, activeStreamCount());
315     assertContainStream(3);
316     frameReader.throwErrorForNextFrame();
317     listener.waitUntilStreamClosed();
318 
319     assertEquals(0, activeStreamCount());
320     assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
321     assertEquals(ERROR_MESSAGE, listener.status.getCause().getMessage());
322     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
323     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
324     shutdownAndVerify();
325   }
326 
327   @Test
nextFrameReturnFalse()328   public void nextFrameReturnFalse() throws Exception {
329     initTransport();
330     MockStreamListener listener = new MockStreamListener();
331     OkHttpClientStream stream =
332         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
333     stream.start(listener);
334     stream.request(1);
335     frameReader.nextFrameAtEndOfStream();
336     listener.waitUntilStreamClosed();
337     assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
338     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
339     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
340     shutdownAndVerify();
341   }
342 
343   @Test
readMessages()344   public void readMessages() throws Exception {
345     initTransport();
346     final int numMessages = 10;
347     final String message = "Hello Client";
348     MockStreamListener listener = new MockStreamListener();
349     OkHttpClientStream stream =
350         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
351     stream.start(listener);
352     stream.request(numMessages);
353     assertContainStream(3);
354     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
355     assertNotNull(listener.headers);
356     for (int i = 0; i < numMessages; i++) {
357       Buffer buffer = createMessageFrame(message + i);
358       frameHandler().data(false, 3, buffer, (int) buffer.size());
359     }
360     frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
361     listener.waitUntilStreamClosed();
362     assertEquals(Status.OK, listener.status);
363     assertNotNull(listener.trailers);
364     assertEquals(numMessages, listener.messages.size());
365     for (int i = 0; i < numMessages; i++) {
366       assertEquals(message + i, listener.messages.get(i));
367     }
368     shutdownAndVerify();
369   }
370 
371   @Test
receivedHeadersForInvalidStreamShouldKillConnection()372   public void receivedHeadersForInvalidStreamShouldKillConnection() throws Exception {
373     initTransport();
374     // Empty headers block without correct content type or status
375     frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(),
376         HeadersMode.HTTP_20_HEADERS);
377     verify(frameWriter, timeout(TIME_OUT_MS))
378         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
379     verify(transportListener).transportShutdown(isA(Status.class));
380     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
381     shutdownAndVerify();
382   }
383 
384   @Test
receivedDataForInvalidStreamShouldKillConnection()385   public void receivedDataForInvalidStreamShouldKillConnection() throws Exception {
386     initTransport();
387     frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
388     verify(frameWriter, timeout(TIME_OUT_MS))
389         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
390     verify(transportListener).transportShutdown(isA(Status.class));
391     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
392     shutdownAndVerify();
393   }
394 
395   @Test
invalidInboundHeadersCancelStream()396   public void invalidInboundHeadersCancelStream() throws Exception {
397     initTransport();
398     MockStreamListener listener = new MockStreamListener();
399     OkHttpClientStream stream =
400         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
401     stream.start(listener);
402     stream.request(1);
403     assertContainStream(3);
404     // Headers block without correct content type or status
405     frameHandler().headers(false, false, 3, 0, Arrays.asList(new Header("random", "4")),
406         HeadersMode.HTTP_20_HEADERS);
407     // Now wait to receive 1000 bytes of data so we can have a better error message before
408     // cancelling the streaam.
409     frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000);
410     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
411     assertNull(listener.headers);
412     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
413     assertNotNull(listener.trailers);
414     assertEquals("4", listener.trailers
415         .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER)));
416     shutdownAndVerify();
417   }
418 
419   @Test
invalidInboundTrailersPropagateToMetadata()420   public void invalidInboundTrailersPropagateToMetadata() throws Exception {
421     initTransport();
422     MockStreamListener listener = new MockStreamListener();
423     OkHttpClientStream stream =
424         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
425     stream.start(listener);
426     stream.request(1);
427     assertContainStream(3);
428     // Headers block with EOS without correct content type or status
429     frameHandler().headers(true, true, 3, 0, Arrays.asList(new Header("random", "4")),
430         HeadersMode.HTTP_20_HEADERS);
431     assertNull(listener.headers);
432     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
433     assertNotNull(listener.trailers);
434     assertEquals("4", listener.trailers
435         .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER)));
436     shutdownAndVerify();
437   }
438 
439   @Test
readStatus()440   public void readStatus() throws Exception {
441     initTransport();
442     MockStreamListener listener = new MockStreamListener();
443     OkHttpClientStream stream =
444         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
445     stream.start(listener);
446     assertContainStream(3);
447     frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
448     listener.waitUntilStreamClosed();
449     assertEquals(Status.Code.OK, listener.status.getCode());
450     shutdownAndVerify();
451   }
452 
453   @Test
receiveReset()454   public void receiveReset() throws Exception {
455     initTransport();
456     MockStreamListener listener = new MockStreamListener();
457     OkHttpClientStream stream =
458         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
459     stream.start(listener);
460     assertContainStream(3);
461     frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR);
462     listener.waitUntilStreamClosed();
463 
464     assertThat(listener.status.getDescription()).contains("Rst Stream");
465     assertThat(listener.status.getCode()).isEqualTo(Code.INTERNAL);
466     shutdownAndVerify();
467   }
468 
469   @Test
cancelStream()470   public void cancelStream() throws Exception {
471     initTransport();
472     MockStreamListener listener = new MockStreamListener();
473     OkHttpClientStream stream =
474         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
475     stream.start(listener);
476     getStream(3).cancel(Status.CANCELLED);
477     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
478     listener.waitUntilStreamClosed();
479     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
480         listener.status.getCode());
481     shutdownAndVerify();
482   }
483 
484   @Test
addDefaultUserAgent()485   public void addDefaultUserAgent() throws Exception {
486     initTransport();
487     MockStreamListener listener = new MockStreamListener();
488     OkHttpClientStream stream =
489         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
490     stream.start(listener);
491     Header userAgentHeader = new Header(GrpcUtil.USER_AGENT_KEY.name(),
492             GrpcUtil.getGrpcUserAgent("okhttp", null));
493     List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER,
494             new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
495             new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
496             userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER);
497     verify(frameWriter, timeout(TIME_OUT_MS))
498         .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
499     getStream(3).cancel(Status.CANCELLED);
500     shutdownAndVerify();
501   }
502 
503   @Test
overrideDefaultUserAgent()504   public void overrideDefaultUserAgent() throws Exception {
505     startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, "fakeUserAgent");
506     MockStreamListener listener = new MockStreamListener();
507     OkHttpClientStream stream =
508         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
509     stream.start(listener);
510     List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER,
511         new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
512         new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
513         new Header(GrpcUtil.USER_AGENT_KEY.name(),
514             GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")),
515         CONTENT_TYPE_HEADER, TE_HEADER);
516     verify(frameWriter, timeout(TIME_OUT_MS))
517         .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
518     getStream(3).cancel(Status.CANCELLED);
519     shutdownAndVerify();
520   }
521 
522   @Test
cancelStreamForDeadlineExceeded()523   public void cancelStreamForDeadlineExceeded() throws Exception {
524     initTransport();
525     MockStreamListener listener = new MockStreamListener();
526     OkHttpClientStream stream =
527         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
528     stream.start(listener);
529     getStream(3).cancel(Status.DEADLINE_EXCEEDED);
530     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
531     listener.waitUntilStreamClosed();
532     shutdownAndVerify();
533   }
534 
535   @Test
writeMessage()536   public void writeMessage() throws Exception {
537     initTransport();
538     final String message = "Hello Server";
539     MockStreamListener listener = new MockStreamListener();
540     OkHttpClientStream stream =
541         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
542     stream.start(listener);
543     InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
544     assertEquals(12, input.available());
545     stream.writeMessage(input);
546     stream.flush();
547     ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
548     verify(frameWriter, timeout(TIME_OUT_MS))
549         .data(eq(false), eq(3), captor.capture(), eq(12 + HEADER_LENGTH));
550     Buffer sentFrame = captor.getValue();
551     assertEquals(createMessageFrame(message), sentFrame);
552     stream.cancel(Status.CANCELLED);
553     shutdownAndVerify();
554   }
555 
556   @Test
transportTracer_windowSizeDefault()557   public void transportTracer_windowSizeDefault() throws Exception {
558     initTransport();
559     TransportStats stats = getTransportStats(clientTransport);
560     assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
561     // okhttp does not track local window sizes
562     assertEquals(-1, stats.localFlowControlWindow);
563   }
564 
565   @Test
transportTracer_windowSize_remote()566   public void transportTracer_windowSize_remote() throws Exception {
567     initTransport();
568     TransportStats before = getTransportStats(clientTransport);
569     assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
570     // okhttp does not track local window sizes
571     assertEquals(-1, before.localFlowControlWindow);
572 
573     frameHandler().windowUpdate(0, 1000);
574     TransportStats after = getTransportStats(clientTransport);
575     assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow);
576     // okhttp does not track local window sizes
577     assertEquals(-1, after.localFlowControlWindow);
578   }
579 
580   @Test
windowUpdate()581   public void windowUpdate() throws Exception {
582     initTransport();
583     MockStreamListener listener1 = new MockStreamListener();
584     MockStreamListener listener2 = new MockStreamListener();
585     OkHttpClientStream stream1 =
586         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
587     stream1.start(listener1);
588     stream1.request(2);
589 
590     OkHttpClientStream stream2 =
591         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
592     stream2.start(listener2);
593     stream2.request(2);
594     assertEquals(2, activeStreamCount());
595     stream1 = getStream(3);
596     stream2 = getStream(5);
597 
598     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
599     frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
600 
601     int messageLength = Utils.DEFAULT_WINDOW_SIZE / 4;
602     byte[] fakeMessage = new byte[messageLength];
603 
604     // Stream 1 receives a message
605     Buffer buffer = createMessageFrame(fakeMessage);
606     int messageFrameLength = (int) buffer.size();
607     frameHandler().data(false, 3, buffer, messageFrameLength);
608 
609     // Stream 2 receives a message
610     buffer = createMessageFrame(fakeMessage);
611     frameHandler().data(false, 5, buffer, messageFrameLength);
612 
613     verify(frameWriter, timeout(TIME_OUT_MS))
614         .windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
615     reset(frameWriter);
616 
617     // Stream 1 receives another message
618     buffer = createMessageFrame(fakeMessage);
619     frameHandler().data(false, 3, buffer, messageFrameLength);
620 
621     verify(frameWriter, timeout(TIME_OUT_MS))
622         .windowUpdate(eq(3), eq((long) 2 * messageFrameLength));
623 
624     // Stream 2 receives another message
625     buffer = createMessageFrame(fakeMessage);
626     frameHandler().data(false, 5, buffer, messageFrameLength);
627 
628     verify(frameWriter, timeout(TIME_OUT_MS))
629         .windowUpdate(eq(5), eq((long) 2 * messageFrameLength));
630     verify(frameWriter, timeout(TIME_OUT_MS))
631         .windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
632 
633     stream1.cancel(Status.CANCELLED);
634     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
635     listener1.waitUntilStreamClosed();
636     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
637         listener1.status.getCode());
638 
639     stream2.cancel(Status.CANCELLED);
640     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(5), eq(ErrorCode.CANCEL));
641     listener2.waitUntilStreamClosed();
642     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
643         listener2.status.getCode());
644     shutdownAndVerify();
645   }
646 
647   @Test
windowUpdateWithInboundFlowControl()648   public void windowUpdateWithInboundFlowControl() throws Exception {
649     initTransport();
650     MockStreamListener listener = new MockStreamListener();
651     OkHttpClientStream stream =
652         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
653     stream.start(listener);
654     int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1;
655     byte[] fakeMessage = new byte[messageLength];
656 
657     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
658     Buffer buffer = createMessageFrame(fakeMessage);
659     long messageFrameLength = buffer.size();
660     frameHandler().data(false, 3, buffer, (int) messageFrameLength);
661     ArgumentCaptor<Integer> idCaptor = ArgumentCaptor.forClass(Integer.class);
662     verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(
663         idCaptor.capture(), eq(messageFrameLength));
664     // Should only send window update for the connection.
665     assertEquals(1, idCaptor.getAllValues().size());
666     assertEquals(0, (int)idCaptor.getValue());
667 
668     stream.request(1);
669     // We return the bytes for the stream window as we read the message.
670     verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(eq(3), eq(messageFrameLength));
671 
672     getStream(3).cancel(Status.CANCELLED);
673     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
674     listener.waitUntilStreamClosed();
675     assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(),
676         listener.status.getCode());
677     shutdownAndVerify();
678   }
679 
680   @Test
outboundFlowControl()681   public void outboundFlowControl() throws Exception {
682     initTransport();
683     MockStreamListener listener = new MockStreamListener();
684     OkHttpClientStream stream =
685         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
686     stream.start(listener);
687     // The first message should be sent out.
688     int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1;
689     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
690     stream.writeMessage(input);
691     stream.flush();
692     verify(frameWriter, timeout(TIME_OUT_MS)).data(
693         eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH));
694 
695 
696     // The second message should be partially sent out.
697     input = new ByteArrayInputStream(new byte[messageLength]);
698     stream.writeMessage(input);
699     stream.flush();
700     int partiallySentSize =
701         Utils.DEFAULT_WINDOW_SIZE - messageLength - HEADER_LENGTH;
702     verify(frameWriter, timeout(TIME_OUT_MS))
703         .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize));
704 
705     // Get more credit, the rest data should be sent out.
706     frameHandler().windowUpdate(3, Utils.DEFAULT_WINDOW_SIZE);
707     frameHandler().windowUpdate(0, Utils.DEFAULT_WINDOW_SIZE);
708     verify(frameWriter, timeout(TIME_OUT_MS)).data(
709         eq(false), eq(3), any(Buffer.class),
710         eq(messageLength + HEADER_LENGTH - partiallySentSize));
711 
712     stream.cancel(Status.CANCELLED);
713     listener.waitUntilStreamClosed();
714     shutdownAndVerify();
715   }
716 
717   @Test
outboundFlowControlWithInitialWindowSizeChange()718   public void outboundFlowControlWithInitialWindowSizeChange() throws Exception {
719     initTransport();
720     MockStreamListener listener = new MockStreamListener();
721     OkHttpClientStream stream =
722         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
723     stream.start(listener);
724     int messageLength = 20;
725     setInitialWindowSize(HEADER_LENGTH + 10);
726     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
727     stream.writeMessage(input);
728     stream.flush();
729     // part of the message can be sent.
730     verify(frameWriter, timeout(TIME_OUT_MS))
731         .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10));
732     // Avoid connection flow control.
733     frameHandler().windowUpdate(0, HEADER_LENGTH + 10);
734 
735     // Increase initial window size
736     setInitialWindowSize(HEADER_LENGTH + 20);
737     // The rest data should be sent.
738     verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(false), eq(3), any(Buffer.class), eq(10));
739     frameHandler().windowUpdate(0, 10);
740 
741     // Decrease initial window size to HEADER_LENGTH, since we've already sent
742     // out HEADER_LENGTH + 20 bytes data, the window size should be -20 now.
743     setInitialWindowSize(HEADER_LENGTH);
744     // Get 20 tokens back, still can't send any data.
745     frameHandler().windowUpdate(3, 20);
746     input = new ByteArrayInputStream(new byte[messageLength]);
747     stream.writeMessage(input);
748     stream.flush();
749     // Only the previous two write operations happened.
750     verify(frameWriter, timeout(TIME_OUT_MS).times(2))
751         .data(anyBoolean(), anyInt(), any(Buffer.class), anyInt());
752 
753     // Get enough tokens to send the pending message.
754     frameHandler().windowUpdate(3, HEADER_LENGTH + 20);
755     verify(frameWriter, timeout(TIME_OUT_MS))
756         .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 20));
757 
758     stream.cancel(Status.CANCELLED);
759     listener.waitUntilStreamClosed();
760     shutdownAndVerify();
761   }
762 
763   @Test
outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream()764   public void outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream() throws Exception {
765     initTransport();
766     MockStreamListener listener = new MockStreamListener();
767     OkHttpClientStream stream =
768         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
769     stream.start(listener);
770     int messageLength = 20;
771     setInitialWindowSize(HEADER_LENGTH + 10);
772     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
773     stream.writeMessage(input);
774     stream.flush();
775     // part of the message can be sent.
776     verify(frameWriter, timeout(TIME_OUT_MS))
777         .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10));
778     // Avoid connection flow control.
779     frameHandler().windowUpdate(0, HEADER_LENGTH + 20);
780 
781     // Increase initial window size
782     setInitialWindowSize(HEADER_LENGTH + 20);
783 
784     // wait until pending frames sent (inOrder doesn't support timeout)
785     verify(frameWriter, timeout(TIME_OUT_MS).atLeastOnce())
786         .data(eq(false), eq(3), any(Buffer.class), eq(10));
787     // It should ack the settings, then send remaining message.
788     InOrder inOrder = inOrder(frameWriter);
789     inOrder.verify(frameWriter).ackSettings(any(Settings.class));
790     inOrder.verify(frameWriter).data(eq(false), eq(3), any(Buffer.class), eq(10));
791 
792     stream.cancel(Status.CANCELLED);
793     listener.waitUntilStreamClosed();
794     shutdownAndVerify();
795   }
796 
797   @Test
stopNormally()798   public void stopNormally() throws Exception {
799     initTransport();
800     MockStreamListener listener1 = new MockStreamListener();
801     MockStreamListener listener2 = new MockStreamListener();
802     OkHttpClientStream stream1 =
803         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
804     stream1.start(listener1);
805     OkHttpClientStream stream2 =
806         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
807     stream2.start(listener2);
808     assertEquals(2, activeStreamCount());
809     clientTransport.shutdown(SHUTDOWN_REASON);
810 
811     assertEquals(2, activeStreamCount());
812     verify(transportListener).transportShutdown(same(SHUTDOWN_REASON));
813 
814     stream1.cancel(Status.CANCELLED);
815     stream2.cancel(Status.CANCELLED);
816     listener1.waitUntilStreamClosed();
817     listener2.waitUntilStreamClosed();
818     assertEquals(0, activeStreamCount());
819     assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode());
820     assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
821     verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any());
822     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
823     shutdownAndVerify();
824   }
825 
826   @Test
receiveGoAway()827   public void receiveGoAway() throws Exception {
828     initTransport();
829     // start 2 streams.
830     MockStreamListener listener1 = new MockStreamListener();
831     MockStreamListener listener2 = new MockStreamListener();
832     OkHttpClientStream stream1 =
833         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
834     stream1.start(listener1);
835     stream1.request(1);
836     OkHttpClientStream stream2 =
837         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
838     stream2.start(listener2);
839     stream2.request(1);
840     assertEquals(2, activeStreamCount());
841 
842     // Receive goAway, max good id is 3.
843     frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY);
844 
845     // Transport should be in STOPPING state.
846     verify(transportListener).transportShutdown(isA(Status.class));
847     verify(transportListener, never()).transportTerminated();
848 
849     // Stream 2 should be closed.
850     listener2.waitUntilStreamClosed();
851     assertEquals(1, activeStreamCount());
852     assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
853 
854     // New stream should be failed.
855     assertNewStreamFail();
856 
857     // But stream 1 should be able to send.
858     final String sentMessage = "Should I also go away?";
859     OkHttpClientStream stream = getStream(3);
860     InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
861     assertEquals(22, input.available());
862     stream.writeMessage(input);
863     stream.flush();
864     ArgumentCaptor<Buffer> captor =
865         ArgumentCaptor.forClass(Buffer.class);
866     verify(frameWriter, timeout(TIME_OUT_MS))
867         .data(eq(false), eq(3), captor.capture(), eq(22 + HEADER_LENGTH));
868     Buffer sentFrame = captor.getValue();
869     assertEquals(createMessageFrame(sentMessage), sentFrame);
870 
871     // And read.
872     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
873     final String receivedMessage = "No, you are fine.";
874     Buffer buffer = createMessageFrame(receivedMessage);
875     frameHandler().data(false, 3, buffer, (int) buffer.size());
876     frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
877     listener1.waitUntilStreamClosed();
878     assertEquals(1, listener1.messages.size());
879     assertEquals(receivedMessage, listener1.messages.get(0));
880 
881     // The transport should be stopped after all active streams finished.
882     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
883     shutdownAndVerify();
884   }
885 
886   @Test
streamIdExhausted()887   public void streamIdExhausted() throws Exception {
888     int startId = Integer.MAX_VALUE - 2;
889     initTransport(startId);
890 
891     MockStreamListener listener = new MockStreamListener();
892     OkHttpClientStream stream =
893         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
894     stream.start(listener);
895     stream.request(1);
896 
897     // New stream should be failed.
898     assertNewStreamFail();
899 
900     // The alive stream should be functional, receives a message.
901     frameHandler().headers(
902         false, false, startId, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
903     assertNotNull(listener.headers);
904     String message = "hello";
905     Buffer buffer = createMessageFrame(message);
906     frameHandler().data(false, startId, buffer, (int) buffer.size());
907 
908     getStream(startId).cancel(Status.CANCELLED);
909     // Receives the second message after be cancelled.
910     buffer = createMessageFrame(message);
911     frameHandler().data(false, startId, buffer, (int) buffer.size());
912 
913     listener.waitUntilStreamClosed();
914     // Should only have the first message delivered.
915     assertEquals(message, listener.messages.get(0));
916     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL));
917     verify(transportListener).transportShutdown(isA(Status.class));
918     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
919     shutdownAndVerify();
920   }
921 
922   @Test
pendingStreamSucceed()923   public void pendingStreamSucceed() throws Exception {
924     initTransport();
925     setMaxConcurrentStreams(1);
926     final MockStreamListener listener1 = new MockStreamListener();
927     final MockStreamListener listener2 = new MockStreamListener();
928     OkHttpClientStream stream1 =
929         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
930     stream1.start(listener1);
931     // The second stream should be pending.
932     OkHttpClientStream stream2 =
933         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
934     stream2.start(listener2);
935     String sentMessage = "hello";
936     InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
937     assertEquals(5, input.available());
938     stream2.writeMessage(input);
939     stream2.flush();
940     stream2.halfClose();
941 
942     waitForStreamPending(1);
943     assertEquals(1, activeStreamCount());
944 
945     // Finish the first stream
946     stream1.cancel(Status.CANCELLED);
947     listener1.waitUntilStreamClosed();
948 
949     // The second stream should be active now, and the pending data should be sent out.
950     assertEquals(1, activeStreamCount());
951     assertEquals(0, clientTransport.getPendingStreamSize());
952     ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
953     verify(frameWriter, timeout(TIME_OUT_MS))
954         .data(eq(false), eq(5), captor.capture(), eq(5 + HEADER_LENGTH));
955     Buffer sentFrame = captor.getValue();
956     assertEquals(createMessageFrame(sentMessage), sentFrame);
957     verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0));
958     stream2.cancel(Status.CANCELLED);
959     shutdownAndVerify();
960   }
961 
962   @Test
pendingStreamCancelled()963   public void pendingStreamCancelled() throws Exception {
964     initTransport();
965     setMaxConcurrentStreams(0);
966     MockStreamListener listener = new MockStreamListener();
967     OkHttpClientStream stream =
968         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
969     stream.start(listener);
970     waitForStreamPending(1);
971     stream.cancel(Status.CANCELLED);
972     // The second cancel should be an no-op.
973     stream.cancel(Status.UNKNOWN);
974     listener.waitUntilStreamClosed();
975     assertEquals(0, clientTransport.getPendingStreamSize());
976     assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
977     shutdownAndVerify();
978   }
979 
980   @Test
pendingStreamFailedByGoAway()981   public void pendingStreamFailedByGoAway() throws Exception {
982     initTransport();
983     setMaxConcurrentStreams(1);
984     final MockStreamListener listener1 = new MockStreamListener();
985     final MockStreamListener listener2 = new MockStreamListener();
986     OkHttpClientStream stream1 =
987         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
988     stream1.start(listener1);
989     // The second stream should be pending.
990     OkHttpClientStream stream2 =
991         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
992     stream2.start(listener2);
993 
994     waitForStreamPending(1);
995     assertEquals(1, activeStreamCount());
996 
997     // Receives GO_AWAY.
998     frameHandler().goAway(99, ErrorCode.CANCEL, ByteString.EMPTY);
999 
1000     listener2.waitUntilStreamClosed();
1001     assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode());
1002     assertEquals(0, clientTransport.getPendingStreamSize());
1003 
1004     // active stream should not be affected.
1005     assertEquals(1, activeStreamCount());
1006     getStream(3).cancel(Status.CANCELLED);
1007     shutdownAndVerify();
1008   }
1009 
1010   @Test
pendingStreamSucceedAfterShutdown()1011   public void pendingStreamSucceedAfterShutdown() throws Exception {
1012     initTransport();
1013     setMaxConcurrentStreams(0);
1014     final MockStreamListener listener = new MockStreamListener();
1015     // The second stream should be pending.
1016     OkHttpClientStream stream =
1017         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1018     stream.start(listener);
1019     waitForStreamPending(1);
1020 
1021     clientTransport.shutdown(SHUTDOWN_REASON);
1022     setMaxConcurrentStreams(1);
1023     verify(frameWriter, timeout(TIME_OUT_MS))
1024         .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
1025     assertEquals(1, activeStreamCount());
1026     stream.cancel(Status.CANCELLED);
1027     shutdownAndVerify();
1028   }
1029 
1030   @Test
pendingStreamFailedByIdExhausted()1031   public void pendingStreamFailedByIdExhausted() throws Exception {
1032     int startId = Integer.MAX_VALUE - 4;
1033     initTransport(startId);
1034     setMaxConcurrentStreams(1);
1035 
1036     final MockStreamListener listener1 = new MockStreamListener();
1037     final MockStreamListener listener2 = new MockStreamListener();
1038     final MockStreamListener listener3 = new MockStreamListener();
1039 
1040     OkHttpClientStream stream1 =
1041         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1042     stream1.start(listener1);
1043 
1044     // The second and third stream should be pending.
1045     OkHttpClientStream stream2 =
1046         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1047     stream2.start(listener2);
1048     OkHttpClientStream stream3 =
1049         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1050     stream3.start(listener3);
1051 
1052     waitForStreamPending(2);
1053     assertEquals(1, activeStreamCount());
1054 
1055     // Now finish stream1, stream2 should be started and exhaust the id,
1056     // so stream3 should be failed.
1057     stream1.cancel(Status.CANCELLED);
1058     listener1.waitUntilStreamClosed();
1059     listener3.waitUntilStreamClosed();
1060     assertEquals(Status.UNAVAILABLE.getCode(), listener3.status.getCode());
1061     assertEquals(0, clientTransport.getPendingStreamSize());
1062     assertEquals(1, activeStreamCount());
1063     stream2 = getStream(startId + 2);
1064     stream2.cancel(Status.CANCELLED);
1065     shutdownAndVerify();
1066   }
1067 
1068   @Test
receivingWindowExceeded()1069   public void receivingWindowExceeded() throws Exception {
1070     initTransport();
1071     MockStreamListener listener = new MockStreamListener();
1072     OkHttpClientStream stream =
1073         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1074     stream.start(listener);
1075     stream.request(1);
1076 
1077     frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
1078 
1079     int messageLength = Utils.DEFAULT_WINDOW_SIZE + 1;
1080     byte[] fakeMessage = new byte[messageLength];
1081     Buffer buffer = createMessageFrame(fakeMessage);
1082     int messageFrameLength = (int) buffer.size();
1083     frameHandler().data(false, 3, buffer, messageFrameLength);
1084 
1085     listener.waitUntilStreamClosed();
1086     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1087     assertEquals("Received data size exceeded our receiving window size",
1088         listener.status.getDescription());
1089     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.FLOW_CONTROL_ERROR));
1090     shutdownAndVerify();
1091   }
1092 
1093   @Test
unaryHeadersShouldNotBeFlushed()1094   public void unaryHeadersShouldNotBeFlushed() throws Exception {
1095     // By default the method is a Unary call
1096     shouldHeadersBeFlushed(false);
1097     shutdownAndVerify();
1098   }
1099 
1100   @Test
serverStreamingHeadersShouldNotBeFlushed()1101   public void serverStreamingHeadersShouldNotBeFlushed() throws Exception {
1102     method = method.toBuilder().setType(MethodType.SERVER_STREAMING).build();
1103     shouldHeadersBeFlushed(false);
1104     shutdownAndVerify();
1105   }
1106 
1107   @Test
clientStreamingHeadersShouldBeFlushed()1108   public void clientStreamingHeadersShouldBeFlushed() throws Exception {
1109     method = method.toBuilder().setType(MethodType.CLIENT_STREAMING).build();
1110     shouldHeadersBeFlushed(true);
1111     shutdownAndVerify();
1112   }
1113 
1114   @Test
duplexStreamingHeadersShouldNotBeFlushed()1115   public void duplexStreamingHeadersShouldNotBeFlushed() throws Exception {
1116     method = method.toBuilder().setType(MethodType.BIDI_STREAMING).build();
1117     shouldHeadersBeFlushed(true);
1118     shutdownAndVerify();
1119   }
1120 
shouldHeadersBeFlushed(boolean shouldBeFlushed)1121   private void shouldHeadersBeFlushed(boolean shouldBeFlushed) throws Exception {
1122     initTransport();
1123     MockStreamListener listener = new MockStreamListener();
1124     OkHttpClientStream stream =
1125         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1126     stream.start(listener);
1127     verify(frameWriter, timeout(TIME_OUT_MS)).synStream(
1128         eq(false), eq(false), eq(3), eq(0), Matchers.anyListOf(Header.class));
1129     if (shouldBeFlushed) {
1130       verify(frameWriter, timeout(TIME_OUT_MS)).flush();
1131     } else {
1132       verify(frameWriter, timeout(TIME_OUT_MS).times(0)).flush();
1133     }
1134     stream.cancel(Status.CANCELLED);
1135   }
1136 
1137   @Test
receiveDataWithoutHeader()1138   public void receiveDataWithoutHeader() throws Exception {
1139     initTransport();
1140     MockStreamListener listener = new MockStreamListener();
1141     OkHttpClientStream stream =
1142         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1143     stream.start(listener);
1144     stream.request(1);
1145     Buffer buffer = createMessageFrame(new byte[1]);
1146     frameHandler().data(false, 3, buffer, (int) buffer.size());
1147 
1148     // Trigger the failure by a trailer.
1149     frameHandler().headers(
1150         true, true, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
1151 
1152     listener.waitUntilStreamClosed();
1153     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1154     assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
1155     assertEquals(0, listener.messages.size());
1156     shutdownAndVerify();
1157   }
1158 
1159   @Test
receiveDataWithoutHeaderAndTrailer()1160   public void receiveDataWithoutHeaderAndTrailer() throws Exception {
1161     initTransport();
1162     MockStreamListener listener = new MockStreamListener();
1163     OkHttpClientStream stream =
1164         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1165     stream.start(listener);
1166     stream.request(1);
1167     Buffer buffer = createMessageFrame(new byte[1]);
1168     frameHandler().data(false, 3, buffer, (int) buffer.size());
1169 
1170     // Trigger the failure by a data frame.
1171     buffer = createMessageFrame(new byte[1]);
1172     frameHandler().data(true, 3, buffer, (int) buffer.size());
1173 
1174     listener.waitUntilStreamClosed();
1175     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1176     assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
1177     assertEquals(0, listener.messages.size());
1178     shutdownAndVerify();
1179   }
1180 
1181   @Test
receiveLongEnoughDataWithoutHeaderAndTrailer()1182   public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
1183     initTransport();
1184     MockStreamListener listener = new MockStreamListener();
1185     OkHttpClientStream stream =
1186         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1187     stream.start(listener);
1188     stream.request(1);
1189     Buffer buffer = createMessageFrame(new byte[1000]);
1190     frameHandler().data(false, 3, buffer, (int) buffer.size());
1191 
1192     // Once we receive enough detail, we cancel the stream. so we should have sent cancel.
1193     verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
1194 
1195     listener.waitUntilStreamClosed();
1196     assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
1197     assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
1198     assertEquals(0, listener.messages.size());
1199     shutdownAndVerify();
1200   }
1201 
1202   @Test
receiveDataForUnknownStreamUpdateConnectionWindow()1203   public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception {
1204     initTransport();
1205     MockStreamListener listener = new MockStreamListener();
1206     OkHttpClientStream stream =
1207         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1208     stream.start(listener);
1209     stream.cancel(Status.CANCELLED);
1210 
1211     Buffer buffer = createMessageFrame(
1212         new byte[Utils.DEFAULT_WINDOW_SIZE / 2 + 1]);
1213     frameHandler().data(false, 3, buffer, (int) buffer.size());
1214     // Should still update the connection window even stream 3 is gone.
1215     verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0,
1216         HEADER_LENGTH + Utils.DEFAULT_WINDOW_SIZE / 2 + 1);
1217     buffer = createMessageFrame(
1218         new byte[Utils.DEFAULT_WINDOW_SIZE / 2 + 1]);
1219 
1220     // This should kill the connection, since we never created stream 5.
1221     frameHandler().data(false, 5, buffer, (int) buffer.size());
1222     verify(frameWriter, timeout(TIME_OUT_MS))
1223         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
1224     verify(transportListener).transportShutdown(isA(Status.class));
1225     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1226     shutdownAndVerify();
1227   }
1228 
1229   @Test
receiveWindowUpdateForUnknownStream()1230   public void receiveWindowUpdateForUnknownStream() throws Exception {
1231     initTransport();
1232     MockStreamListener listener = new MockStreamListener();
1233     OkHttpClientStream stream =
1234         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1235     stream.start(listener);
1236     stream.cancel(Status.CANCELLED);
1237     // This should be ignored.
1238     frameHandler().windowUpdate(3, 73);
1239     listener.waitUntilStreamClosed();
1240     // This should kill the connection, since we never created stream 5.
1241     frameHandler().windowUpdate(5, 73);
1242     verify(frameWriter, timeout(TIME_OUT_MS))
1243         .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class));
1244     verify(transportListener).transportShutdown(isA(Status.class));
1245     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1246     shutdownAndVerify();
1247   }
1248 
1249   @Test
shouldBeInitiallyReady()1250   public void shouldBeInitiallyReady() throws Exception {
1251     initTransport();
1252     MockStreamListener listener = new MockStreamListener();
1253     OkHttpClientStream stream =
1254         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1255     stream.start(listener);
1256     assertTrue(stream.isReady());
1257     assertTrue(listener.isOnReadyCalled());
1258     stream.cancel(Status.CANCELLED);
1259     assertFalse(stream.isReady());
1260     shutdownAndVerify();
1261   }
1262 
1263   @Test
notifyOnReady()1264   public void notifyOnReady() throws Exception {
1265     initTransport();
1266     // exactly one byte below the threshold
1267     int messageLength =
1268         AbstractStream.TransportState.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1;
1269     setInitialWindowSize(0);
1270     MockStreamListener listener = new MockStreamListener();
1271     OkHttpClientStream stream =
1272         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1273     stream.start(listener);
1274     assertTrue(stream.isReady());
1275     // Be notified at the beginning.
1276     assertTrue(listener.isOnReadyCalled());
1277 
1278     // Write a message that will not exceed the notification threshold and queue it.
1279     InputStream input = new ByteArrayInputStream(new byte[messageLength]);
1280     stream.writeMessage(input);
1281     stream.flush();
1282     assertTrue(stream.isReady());
1283 
1284     // Write another two messages, still be queued.
1285     input = new ByteArrayInputStream(new byte[messageLength]);
1286     stream.writeMessage(input);
1287     stream.flush();
1288     assertFalse(stream.isReady());
1289     input = new ByteArrayInputStream(new byte[messageLength]);
1290     stream.writeMessage(input);
1291     stream.flush();
1292     assertFalse(stream.isReady());
1293 
1294     // Let the first message out.
1295     frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength);
1296     frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength);
1297     assertFalse(stream.isReady());
1298     assertFalse(listener.isOnReadyCalled());
1299 
1300     // Let the second message out.
1301     frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength);
1302     frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength);
1303     assertTrue(stream.isReady());
1304     assertTrue(listener.isOnReadyCalled());
1305 
1306     stream.cancel(Status.CANCELLED);
1307     shutdownAndVerify();
1308   }
1309 
1310   @Test
transportReady()1311   public void transportReady() throws Exception {
1312     initTransport();
1313     verifyZeroInteractions(transportListener);
1314     frameHandler().settings(false, new Settings());
1315     verify(transportListener).transportReady();
1316     shutdownAndVerify();
1317   }
1318 
1319   @Test
ping()1320   public void ping() throws Exception {
1321     initTransport();
1322     PingCallbackImpl callback1 = new PingCallbackImpl();
1323     clientTransport.ping(callback1, MoreExecutors.directExecutor());
1324     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1325     // add'l ping will be added as listener to outstanding operation
1326     PingCallbackImpl callback2 = new PingCallbackImpl();
1327     clientTransport.ping(callback2, MoreExecutors.directExecutor());
1328     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1329 
1330     ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class);
1331     ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class);
1332     verify(frameWriter, timeout(TIME_OUT_MS)).ping(eq(false), captor1.capture(), captor2.capture());
1333     // callback not invoked until we see acknowledgement
1334     assertEquals(0, callback1.invocationCount);
1335     assertEquals(0, callback2.invocationCount);
1336 
1337     int payload1 = captor1.getValue();
1338     int payload2 = captor2.getValue();
1339     // getting a bad ack won't complete the future
1340     // to make the ack "bad", we modify the payload so it doesn't match
1341     frameHandler().ping(true, payload1, payload2 - 1);
1342     // operation not complete because ack was wrong
1343     assertEquals(0, callback1.invocationCount);
1344     assertEquals(0, callback2.invocationCount);
1345 
1346     nanoTime += 10101;
1347 
1348     // reading the proper response should complete the future
1349     frameHandler().ping(true, payload1, payload2);
1350     assertEquals(1, callback1.invocationCount);
1351     assertEquals(10101, callback1.roundTripTime);
1352     assertNull(callback1.failureCause);
1353     // callback2 piggy-backed on same operation
1354     assertEquals(1, callback2.invocationCount);
1355     assertEquals(10101, callback2.roundTripTime);
1356     assertNull(callback2.failureCause);
1357 
1358     // now that previous ping is done, next request returns a different future
1359     callback1 = new PingCallbackImpl();
1360     clientTransport.ping(callback1, MoreExecutors.directExecutor());
1361     assertEquals(2, getTransportStats(clientTransport).keepAlivesSent);
1362     assertEquals(0, callback1.invocationCount);
1363     shutdownAndVerify();
1364   }
1365 
1366   @Test
ping_failsWhenTransportShutdown()1367   public void ping_failsWhenTransportShutdown() throws Exception {
1368     initTransport();
1369     PingCallbackImpl callback = new PingCallbackImpl();
1370     clientTransport.ping(callback, MoreExecutors.directExecutor());
1371     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1372     assertEquals(0, callback.invocationCount);
1373 
1374     clientTransport.shutdown(SHUTDOWN_REASON);
1375     // ping failed on channel shutdown
1376     assertEquals(1, callback.invocationCount);
1377     assertTrue(callback.failureCause instanceof StatusException);
1378     assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
1379 
1380     // now that handler is in terminal state, all future pings fail immediately
1381     callback = new PingCallbackImpl();
1382     clientTransport.ping(callback, MoreExecutors.directExecutor());
1383     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1384     assertEquals(1, callback.invocationCount);
1385     assertTrue(callback.failureCause instanceof StatusException);
1386     assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
1387     shutdownAndVerify();
1388   }
1389 
1390   @Test
ping_failsIfTransportFails()1391   public void ping_failsIfTransportFails() throws Exception {
1392     initTransport();
1393     PingCallbackImpl callback = new PingCallbackImpl();
1394     clientTransport.ping(callback, MoreExecutors.directExecutor());
1395     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1396     assertEquals(0, callback.invocationCount);
1397 
1398     clientTransport.onException(new IOException());
1399     // ping failed on error
1400     assertEquals(1, callback.invocationCount);
1401     assertTrue(callback.failureCause instanceof StatusException);
1402     assertEquals(Status.Code.UNAVAILABLE,
1403         ((StatusException) callback.failureCause).getStatus().getCode());
1404 
1405     // now that handler is in terminal state, all future pings fail immediately
1406     callback = new PingCallbackImpl();
1407     clientTransport.ping(callback, MoreExecutors.directExecutor());
1408     assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
1409     assertEquals(1, callback.invocationCount);
1410     assertTrue(callback.failureCause instanceof StatusException);
1411     assertEquals(Status.Code.UNAVAILABLE,
1412         ((StatusException) callback.failureCause).getStatus().getCode());
1413     shutdownAndVerify();
1414   }
1415 
1416   @Test
writeBeforeConnected()1417   public void writeBeforeConnected() throws Exception {
1418     initTransportAndDelayConnected();
1419     final String message = "Hello Server";
1420     MockStreamListener listener = new MockStreamListener();
1421     OkHttpClientStream stream =
1422         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1423     stream.start(listener);
1424     InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
1425     stream.writeMessage(input);
1426     stream.flush();
1427     // The message should be queued.
1428     verifyNoMoreInteractions(frameWriter);
1429 
1430     allowTransportConnected();
1431 
1432     // The queued message should be sent out.
1433     ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
1434     verify(frameWriter, timeout(TIME_OUT_MS))
1435         .data(eq(false), eq(3), captor.capture(), eq(12 + HEADER_LENGTH));
1436     Buffer sentFrame = captor.getValue();
1437     assertEquals(createMessageFrame(message), sentFrame);
1438     stream.cancel(Status.CANCELLED);
1439     shutdownAndVerify();
1440   }
1441 
1442   @Test
cancelBeforeConnected()1443   public void cancelBeforeConnected() throws Exception {
1444     initTransportAndDelayConnected();
1445     final String message = "Hello Server";
1446     MockStreamListener listener = new MockStreamListener();
1447     OkHttpClientStream stream =
1448         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1449     stream.start(listener);
1450     InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
1451     stream.writeMessage(input);
1452     stream.flush();
1453     stream.cancel(Status.CANCELLED);
1454     verifyNoMoreInteractions(frameWriter);
1455 
1456     allowTransportConnected();
1457     verifyNoMoreInteractions(frameWriter);
1458     shutdownAndVerify();
1459   }
1460 
1461   @Test
shutdownDuringConnecting()1462   public void shutdownDuringConnecting() throws Exception {
1463     initTransportAndDelayConnected();
1464     MockStreamListener listener = new MockStreamListener();
1465     OkHttpClientStream stream =
1466         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1467     stream.start(listener);
1468     clientTransport.shutdown(SHUTDOWN_REASON);
1469     allowTransportConnected();
1470 
1471     // The new stream should be failed, but not the pending stream.
1472     assertNewStreamFail();
1473     verify(frameWriter, timeout(TIME_OUT_MS))
1474         .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
1475     assertEquals(1, activeStreamCount());
1476     stream.cancel(Status.CANCELLED);
1477     listener.waitUntilStreamClosed();
1478     assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
1479     shutdownAndVerify();
1480   }
1481 
1482   @Test
invalidAuthorityPropagates()1483   public void invalidAuthorityPropagates() {
1484     clientTransport = new OkHttpClientTransport(
1485         new InetSocketAddress("host", 1234),
1486         "invalid_authority",
1487         "userAgent",
1488         executor,
1489         sslSocketFactory,
1490         hostnameVerifier,
1491         ConnectionSpec.CLEARTEXT,
1492         DEFAULT_MAX_MESSAGE_SIZE,
1493         NO_PROXY,
1494         tooManyPingsRunnable,
1495         transportTracer);
1496 
1497     String host = clientTransport.getOverridenHost();
1498     int port = clientTransport.getOverridenPort();
1499 
1500     assertEquals("invalid_authority", host);
1501     assertEquals(1234, port);
1502   }
1503 
1504   @Test
unreachableServer()1505   public void unreachableServer() throws Exception {
1506     clientTransport = new OkHttpClientTransport(
1507         new InetSocketAddress("localhost", 0),
1508         "authority",
1509         "userAgent",
1510         executor,
1511         sslSocketFactory,
1512         hostnameVerifier,
1513         ConnectionSpec.CLEARTEXT,
1514         DEFAULT_MAX_MESSAGE_SIZE,
1515         NO_PROXY,
1516         tooManyPingsRunnable,
1517         new TransportTracer());
1518 
1519     ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
1520     clientTransport.start(listener);
1521     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1522     verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());
1523     Status status = captor.getValue();
1524     assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
1525     assertTrue(status.getCause().toString(), status.getCause() instanceof IOException);
1526 
1527     MockStreamListener streamListener = new MockStreamListener();
1528     clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT).start(streamListener);
1529     streamListener.waitUntilStreamClosed();
1530     assertEquals(Status.UNAVAILABLE.getCode(), streamListener.status.getCode());
1531   }
1532 
1533   @Test
proxy_200()1534   public void proxy_200() throws Exception {
1535     ServerSocket serverSocket = new ServerSocket(0);
1536     clientTransport = new OkHttpClientTransport(
1537         InetSocketAddress.createUnresolved("theservice", 80),
1538         "authority",
1539         "userAgent",
1540         executor,
1541         sslSocketFactory,
1542         hostnameVerifier,
1543         ConnectionSpec.CLEARTEXT,
1544         DEFAULT_MAX_MESSAGE_SIZE,
1545         new ProxyParameters(
1546             (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW),
1547         tooManyPingsRunnable,
1548         transportTracer);
1549     clientTransport.start(transportListener);
1550 
1551     Socket sock = serverSocket.accept();
1552     serverSocket.close();
1553 
1554     BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8));
1555     assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine());
1556     assertEquals("Host: theservice:80", reader.readLine());
1557     while (!"".equals(reader.readLine())) {}
1558 
1559     sock.getOutputStream().write("HTTP/1.1 200 OK\r\nServer: test\r\n\r\n".getBytes(UTF_8));
1560     sock.getOutputStream().flush();
1561 
1562     assertEquals("PRI * HTTP/2.0", reader.readLine());
1563     assertEquals("", reader.readLine());
1564     assertEquals("SM", reader.readLine());
1565     assertEquals("", reader.readLine());
1566 
1567     // Empty SETTINGS
1568     sock.getOutputStream().write(new byte[] {0, 0, 0, 0, 0x4, 0});
1569     // GOAWAY
1570     sock.getOutputStream().write(new byte[] {
1571         0, 0, 0, 8, 0x7, 0,
1572         0, 0, 0, 0, // last stream id
1573         0, 0, 0, 0, // error code
1574     });
1575     sock.getOutputStream().flush();
1576 
1577     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
1578     while (sock.getInputStream().read() != -1) {}
1579     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1580     sock.close();
1581   }
1582 
1583   @Test
proxy_500()1584   public void proxy_500() throws Exception {
1585     ServerSocket serverSocket = new ServerSocket(0);
1586     clientTransport = new OkHttpClientTransport(
1587         InetSocketAddress.createUnresolved("theservice", 80),
1588         "authority",
1589         "userAgent",
1590         executor,
1591         sslSocketFactory,
1592         hostnameVerifier,
1593         ConnectionSpec.CLEARTEXT,
1594         DEFAULT_MAX_MESSAGE_SIZE,
1595         new ProxyParameters(
1596             (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW),
1597         tooManyPingsRunnable,
1598         transportTracer);
1599     clientTransport.start(transportListener);
1600 
1601     Socket sock = serverSocket.accept();
1602     serverSocket.close();
1603 
1604     BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8));
1605     assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine());
1606     assertEquals("Host: theservice:80", reader.readLine());
1607     while (!"".equals(reader.readLine())) {}
1608 
1609     final String errorText = "text describing error";
1610     sock.getOutputStream().write("HTTP/1.1 500 OH NO\r\n\r\n".getBytes(UTF_8));
1611     sock.getOutputStream().write(errorText.getBytes(UTF_8));
1612     sock.getOutputStream().flush();
1613     sock.shutdownOutput();
1614 
1615     assertEquals(-1, sock.getInputStream().read());
1616 
1617     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1618     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());
1619     Status error = captor.getValue();
1620     assertTrue("Status didn't contain error code: " + captor.getValue(),
1621         error.getDescription().contains("500"));
1622     assertTrue("Status didn't contain error description: " + captor.getValue(),
1623         error.getDescription().contains("OH NO"));
1624     assertTrue("Status didn't contain error text: " + captor.getValue(),
1625         error.getDescription().contains(errorText));
1626     assertEquals("Not UNAVAILABLE: " + captor.getValue(),
1627         Status.UNAVAILABLE.getCode(), error.getCode());
1628     sock.close();
1629     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1630   }
1631 
1632   @Test
proxy_immediateServerClose()1633   public void proxy_immediateServerClose() throws Exception {
1634     ServerSocket serverSocket = new ServerSocket(0);
1635     clientTransport = new OkHttpClientTransport(
1636         InetSocketAddress.createUnresolved("theservice", 80),
1637         "authority",
1638         "userAgent",
1639         executor,
1640         sslSocketFactory,
1641         hostnameVerifier,
1642         ConnectionSpec.CLEARTEXT,
1643         DEFAULT_MAX_MESSAGE_SIZE,
1644         new ProxyParameters(
1645             (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW),
1646         tooManyPingsRunnable,
1647         transportTracer);
1648     clientTransport.start(transportListener);
1649 
1650     Socket sock = serverSocket.accept();
1651     serverSocket.close();
1652     sock.close();
1653 
1654     ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
1655     verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());
1656     Status error = captor.getValue();
1657     assertTrue("Status didn't contain proxy: " + captor.getValue(),
1658         error.getDescription().contains("proxy"));
1659     assertEquals("Not UNAVAILABLE: " + captor.getValue(),
1660         Status.UNAVAILABLE.getCode(), error.getCode());
1661     verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
1662   }
1663 
1664   @Test
goAway_notUtf8()1665   public void goAway_notUtf8() throws Exception {
1666     initTransport();
1667     // 0xFF is never permitted in UTF-8. 0xF0 should have 3 continuations following, and 0x0a isn't
1668     // a continuation.
1669     frameHandler().goAway(
1670         0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.of((byte) 0xFF, (byte) 0xF0, (byte) 0x0a));
1671 
1672     shutdownAndVerify();
1673   }
1674 
1675   @Test
goAway_notTooManyPings()1676   public void goAway_notTooManyPings() throws Exception {
1677     final AtomicBoolean run = new AtomicBoolean();
1678     tooManyPingsRunnable = new Runnable() {
1679       @Override
1680       public void run() {
1681         run.set(true);
1682       }
1683     };
1684     initTransport();
1685     frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("not_many_pings"));
1686     assertFalse(run.get());
1687 
1688     shutdownAndVerify();
1689   }
1690 
1691   @Test
goAway_tooManyPings()1692   public void goAway_tooManyPings() throws Exception {
1693     final AtomicBoolean run = new AtomicBoolean();
1694     tooManyPingsRunnable = new Runnable() {
1695       @Override
1696       public void run() {
1697         run.set(true);
1698       }
1699     };
1700     initTransport();
1701     frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("too_many_pings"));
1702     assertTrue(run.get());
1703 
1704     shutdownAndVerify();
1705   }
1706 
1707   @Test
goAway_streamListenerRpcProgress()1708   public void goAway_streamListenerRpcProgress() throws Exception {
1709     initTransport();
1710     setMaxConcurrentStreams(2);
1711     MockStreamListener listener1 = new MockStreamListener();
1712     MockStreamListener listener2 = new MockStreamListener();
1713     MockStreamListener listener3 = new MockStreamListener();
1714     OkHttpClientStream stream1 =
1715         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1716     stream1.start(listener1);
1717     OkHttpClientStream stream2 =
1718         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1719     stream2.start(listener2);
1720     OkHttpClientStream stream3 =
1721         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1722     stream3.start(listener3);
1723     waitForStreamPending(1);
1724 
1725     assertEquals(2, activeStreamCount());
1726     assertContainStream(DEFAULT_START_STREAM_ID);
1727     assertContainStream(DEFAULT_START_STREAM_ID + 2);
1728 
1729     frameHandler()
1730         .goAway(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL, ByteString.encodeUtf8("blablabla"));
1731 
1732     listener2.waitUntilStreamClosed();
1733     listener3.waitUntilStreamClosed();
1734     assertNull(listener1.rpcProgress);
1735     assertEquals(REFUSED, listener2.rpcProgress);
1736     assertEquals(REFUSED, listener3.rpcProgress);
1737     assertEquals(1, activeStreamCount());
1738     assertContainStream(DEFAULT_START_STREAM_ID);
1739 
1740     getStream(DEFAULT_START_STREAM_ID).cancel(Status.CANCELLED);
1741 
1742     listener1.waitUntilStreamClosed();
1743     assertEquals(PROCESSED, listener1.rpcProgress);
1744 
1745     shutdownAndVerify();
1746   }
1747 
1748   @Test
reset_streamListenerRpcProgress()1749   public void reset_streamListenerRpcProgress() throws Exception {
1750     initTransport();
1751     MockStreamListener listener1 = new MockStreamListener();
1752     MockStreamListener listener2 = new MockStreamListener();
1753     MockStreamListener listener3 = new MockStreamListener();
1754     OkHttpClientStream stream1 =
1755         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1756     stream1.start(listener1);
1757     OkHttpClientStream stream2 =
1758         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1759     stream2.start(listener2);
1760     OkHttpClientStream stream3 =
1761         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1762     stream3.start(listener3);
1763 
1764     assertEquals(3, activeStreamCount());
1765     assertContainStream(DEFAULT_START_STREAM_ID);
1766     assertContainStream(DEFAULT_START_STREAM_ID + 2);
1767     assertContainStream(DEFAULT_START_STREAM_ID + 4);
1768 
1769     frameHandler().rstStream(DEFAULT_START_STREAM_ID + 2, ErrorCode.REFUSED_STREAM);
1770 
1771     listener2.waitUntilStreamClosed();
1772     assertNull(listener1.rpcProgress);
1773     assertEquals(REFUSED, listener2.rpcProgress);
1774     assertNull(listener3.rpcProgress);
1775 
1776     frameHandler().rstStream(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL);
1777     listener1.waitUntilStreamClosed();
1778     assertEquals(PROCESSED, listener1.rpcProgress);
1779     assertNull(listener3.rpcProgress);
1780 
1781     getStream(DEFAULT_START_STREAM_ID + 4).cancel(Status.CANCELLED);
1782 
1783     listener3.waitUntilStreamClosed();
1784     assertEquals(PROCESSED, listener3.rpcProgress);
1785 
1786     shutdownAndVerify();
1787   }
1788 
activeStreamCount()1789   private int activeStreamCount() {
1790     return clientTransport.getActiveStreams().length;
1791   }
1792 
getStream(int streamId)1793   private OkHttpClientStream getStream(int streamId) {
1794     return clientTransport.getStream(streamId);
1795   }
1796 
assertContainStream(int streamId)1797   void assertContainStream(int streamId) {
1798     assertNotNull(clientTransport.getStream(streamId));
1799   }
1800 
frameHandler()1801   private ClientFrameHandler frameHandler() throws Exception {
1802     return clientTransport.getHandler();
1803   }
1804 
waitForStreamPending(int expected)1805   private void waitForStreamPending(int expected) throws Exception {
1806     int duration = TIME_OUT_MS / 10;
1807     for (int i = 0; i < 10; i++) {
1808       if (clientTransport.getPendingStreamSize() == expected) {
1809         return;
1810       }
1811       Thread.sleep(duration);
1812     }
1813     assertEquals(expected, clientTransport.getPendingStreamSize());
1814   }
1815 
assertNewStreamFail()1816   private void assertNewStreamFail() throws Exception {
1817     MockStreamListener listener = new MockStreamListener();
1818     OkHttpClientStream stream =
1819         clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
1820     stream.start(listener);
1821     listener.waitUntilStreamClosed();
1822     assertFalse(listener.status.isOk());
1823   }
1824 
setMaxConcurrentStreams(int num)1825   private void setMaxConcurrentStreams(int num) throws Exception {
1826     Settings settings = new Settings();
1827     OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, num);
1828     frameHandler().settings(false, settings);
1829   }
1830 
setInitialWindowSize(int size)1831   private void setInitialWindowSize(int size) throws Exception {
1832     Settings settings = new Settings();
1833     OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, size);
1834     frameHandler().settings(false, settings);
1835   }
1836 
createMessageFrame(String message)1837   private static Buffer createMessageFrame(String message) {
1838     return createMessageFrame(message.getBytes(UTF_8));
1839   }
1840 
createMessageFrame(byte[] message)1841   private static Buffer createMessageFrame(byte[] message) {
1842     Buffer buffer = new Buffer();
1843     buffer.writeByte(0 /* UNCOMPRESSED */);
1844     buffer.writeInt(message.length);
1845     buffer.write(message);
1846     return buffer;
1847   }
1848 
grpcResponseHeaders()1849   private List<Header> grpcResponseHeaders() {
1850     return ImmutableList.of(
1851         new Header(":status", "200"),
1852         CONTENT_TYPE_HEADER);
1853   }
1854 
grpcResponseTrailers()1855   private List<Header> grpcResponseTrailers() {
1856     return ImmutableList.of(
1857         new Header(InternalStatus.CODE_KEY.name(), "0"),
1858         // Adding Content-Type and :status for testing responses with only a single HEADERS frame.
1859         new Header(":status", "200"),
1860         CONTENT_TYPE_HEADER);
1861   }
1862 
anyListHeader()1863   private static List<Header> anyListHeader() {
1864     return any();
1865   }
1866 
1867   private static class MockFrameReader implements FrameReader {
1868     final CountDownLatch closed = new CountDownLatch(1);
1869 
1870     enum Result {
1871       THROW_EXCEPTION,
1872       RETURN_FALSE,
1873       THROW_ERROR
1874     }
1875 
1876     final LinkedBlockingQueue<Result> nextResults = new LinkedBlockingQueue<Result>();
1877 
1878     @Override
close()1879     public void close() throws IOException {
1880       closed.countDown();
1881     }
1882 
assertClosed()1883     void assertClosed() {
1884       try {
1885         if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) {
1886           fail("Failed waiting frame reader to be closed.");
1887         }
1888       } catch (InterruptedException e) {
1889         Thread.currentThread().interrupt();
1890         fail("Interrupted while waiting for frame reader to be closed.");
1891       }
1892     }
1893 
1894     // The wait is safe; nextFrame is called in a loop and can have spurious wakeups
1895     @SuppressWarnings("WaitNotInLoop")
1896     @Override
nextFrame(Handler handler)1897     public boolean nextFrame(Handler handler) throws IOException {
1898       Result result;
1899       try {
1900         result = nextResults.take();
1901       } catch (InterruptedException e) {
1902         Thread.currentThread().interrupt();
1903         throw new IOException(e);
1904       }
1905       switch (result) {
1906         case THROW_EXCEPTION:
1907           throw new IOException(NETWORK_ISSUE_MESSAGE);
1908         case RETURN_FALSE:
1909           return false;
1910         case THROW_ERROR:
1911           throw new Error(ERROR_MESSAGE);
1912         default:
1913           throw new UnsupportedOperationException("unimplemented: " + result);
1914       }
1915     }
1916 
throwIoExceptionForNextFrame()1917     void throwIoExceptionForNextFrame() {
1918       nextResults.add(Result.THROW_EXCEPTION);
1919     }
1920 
throwErrorForNextFrame()1921     void throwErrorForNextFrame() {
1922       nextResults.add(Result.THROW_ERROR);
1923     }
1924 
nextFrameAtEndOfStream()1925     void nextFrameAtEndOfStream() {
1926       nextResults.add(Result.RETURN_FALSE);
1927     }
1928 
1929     @Override
readConnectionPreface()1930     public void readConnectionPreface() throws IOException {
1931       // not used.
1932     }
1933   }
1934 
1935   private static class MockStreamListener implements ClientStreamListener {
1936     Status status;
1937     Metadata headers;
1938     Metadata trailers;
1939     RpcProgress rpcProgress;
1940     CountDownLatch closed = new CountDownLatch(1);
1941     ArrayList<String> messages = new ArrayList<>();
1942     boolean onReadyCalled;
1943 
MockStreamListener()1944     MockStreamListener() {
1945     }
1946 
1947     @Override
headersRead(Metadata headers)1948     public void headersRead(Metadata headers) {
1949       this.headers = headers;
1950     }
1951 
1952     @Override
messagesAvailable(MessageProducer producer)1953     public void messagesAvailable(MessageProducer producer) {
1954       InputStream inputStream;
1955       while ((inputStream = producer.next()) != null) {
1956         String msg = getContent(inputStream);
1957         if (msg != null) {
1958           messages.add(msg);
1959         }
1960       }
1961     }
1962 
1963     @Override
closed(Status status, Metadata trailers)1964     public void closed(Status status, Metadata trailers) {
1965       closed(status, PROCESSED, trailers);
1966     }
1967 
1968     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)1969     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
1970       this.status = status;
1971       this.trailers = trailers;
1972       this.rpcProgress = rpcProgress;
1973       closed.countDown();
1974     }
1975 
1976     @Override
onReady()1977     public void onReady() {
1978       onReadyCalled = true;
1979     }
1980 
isOnReadyCalled()1981     boolean isOnReadyCalled() {
1982       boolean value = onReadyCalled;
1983       onReadyCalled = false;
1984       return value;
1985     }
1986 
waitUntilStreamClosed()1987     void waitUntilStreamClosed() throws InterruptedException {
1988       if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) {
1989         fail("Failed waiting stream to be closed.");
1990       }
1991     }
1992 
1993     @SuppressWarnings("Finally") // We don't care about suppressed exceptions in the test
getContent(InputStream message)1994     static String getContent(InputStream message) {
1995       BufferedReader br = new BufferedReader(new InputStreamReader(message, UTF_8));
1996       try {
1997         // Only one line message is used in this test.
1998         return br.readLine();
1999       } catch (IOException e) {
2000         return null;
2001       } finally {
2002         try {
2003           message.close();
2004         } catch (IOException e) {
2005           // Ignore
2006         }
2007       }
2008     }
2009   }
2010 
2011   private static class MockSocket extends Socket {
2012     MockFrameReader frameReader;
2013 
MockSocket(MockFrameReader frameReader)2014     MockSocket(MockFrameReader frameReader) {
2015       this.frameReader = frameReader;
2016     }
2017 
2018     @Override
close()2019     public synchronized void close() {
2020       frameReader.nextFrameAtEndOfStream();
2021     }
2022 
2023     @Override
getLocalSocketAddress()2024     public SocketAddress getLocalSocketAddress() {
2025       return InetSocketAddress.createUnresolved("localhost", 4000);
2026     }
2027   }
2028 
2029   static class PingCallbackImpl implements ClientTransport.PingCallback {
2030     int invocationCount;
2031     long roundTripTime;
2032     Throwable failureCause;
2033 
2034     @Override
onSuccess(long roundTripTimeNanos)2035     public void onSuccess(long roundTripTimeNanos) {
2036       invocationCount++;
2037       this.roundTripTime = roundTripTimeNanos;
2038     }
2039 
2040     @Override
onFailure(Throwable cause)2041     public void onFailure(Throwable cause) {
2042       invocationCount++;
2043       this.failureCause = cause;
2044     }
2045   }
2046 
allowTransportConnected()2047   private void allowTransportConnected() {
2048     delayConnectedCallback.allowConnected();
2049   }
2050 
shutdownAndVerify()2051   private void shutdownAndVerify() {
2052     clientTransport.shutdown(SHUTDOWN_REASON);
2053     assertEquals(0, activeStreamCount());
2054     try {
2055       verify(frameWriter, timeout(TIME_OUT_MS)).close();
2056     } catch (IOException e) {
2057       throw new RuntimeException(e);
2058 
2059     }
2060     frameReader.assertClosed();
2061   }
2062 
2063   private static class DelayConnectedCallback implements Runnable {
2064     SettableFuture<Void> delayed = SettableFuture.create();
2065 
2066     @Override
run()2067     public void run() {
2068       Futures.getUnchecked(delayed);
2069     }
2070 
allowConnected()2071     void allowConnected() {
2072       delayed.set(null);
2073     }
2074   }
2075 
getTransportStats(InternalInstrumented<SocketStats> obj)2076   private static TransportStats getTransportStats(InternalInstrumented<SocketStats> obj)
2077       throws ExecutionException, InterruptedException {
2078     return obj.getStats().get().data;
2079   }
2080 }
2081