1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import static com.google.common.base.Charsets.UTF_8; 20 import static com.google.common.truth.Truth.assertThat; 21 import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; 22 import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; 23 import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; 24 import static io.grpc.okhttp.Headers.CONTENT_TYPE_HEADER; 25 import static io.grpc.okhttp.Headers.METHOD_HEADER; 26 import static io.grpc.okhttp.Headers.SCHEME_HEADER; 27 import static io.grpc.okhttp.Headers.TE_HEADER; 28 import static org.junit.Assert.assertEquals; 29 import static org.junit.Assert.assertFalse; 30 import static org.junit.Assert.assertNotNull; 31 import static org.junit.Assert.assertNull; 32 import static org.junit.Assert.assertSame; 33 import static org.junit.Assert.assertTrue; 34 import static org.junit.Assert.fail; 35 import static org.mockito.Matchers.any; 36 import static org.mockito.Matchers.anyBoolean; 37 import static org.mockito.Matchers.anyInt; 38 import static org.mockito.Matchers.eq; 39 import static org.mockito.Matchers.isA; 40 import static org.mockito.Matchers.same; 41 import static org.mockito.Mockito.inOrder; 42 import static org.mockito.Mockito.mock; 43 import static org.mockito.Mockito.never; 44 import static org.mockito.Mockito.reset; 45 import static org.mockito.Mockito.timeout; 46 import static org.mockito.Mockito.verify; 47 import static org.mockito.Mockito.verifyNoMoreInteractions; 48 import static org.mockito.Mockito.verifyZeroInteractions; 49 import static org.mockito.Mockito.when; 50 51 import com.google.common.base.Stopwatch; 52 import com.google.common.base.Supplier; 53 import com.google.common.base.Ticker; 54 import com.google.common.collect.ImmutableList; 55 import com.google.common.util.concurrent.Futures; 56 import com.google.common.util.concurrent.MoreExecutors; 57 import com.google.common.util.concurrent.SettableFuture; 58 import io.grpc.CallOptions; 59 import io.grpc.InternalChannelz.SocketStats; 60 import io.grpc.InternalChannelz.TransportStats; 61 import io.grpc.InternalInstrumented; 62 import io.grpc.InternalStatus; 63 import io.grpc.Metadata; 64 import io.grpc.MethodDescriptor; 65 import io.grpc.MethodDescriptor.MethodType; 66 import io.grpc.Status; 67 import io.grpc.Status.Code; 68 import io.grpc.StatusException; 69 import io.grpc.internal.AbstractStream; 70 import io.grpc.internal.ClientStreamListener; 71 import io.grpc.internal.ClientTransport; 72 import io.grpc.internal.GrpcUtil; 73 import io.grpc.internal.ManagedClientTransport; 74 import io.grpc.internal.ProxyParameters; 75 import io.grpc.internal.TransportTracer; 76 import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler; 77 import io.grpc.okhttp.internal.ConnectionSpec; 78 import io.grpc.okhttp.internal.framed.ErrorCode; 79 import io.grpc.okhttp.internal.framed.FrameReader; 80 import io.grpc.okhttp.internal.framed.FrameWriter; 81 import io.grpc.okhttp.internal.framed.Header; 82 import io.grpc.okhttp.internal.framed.HeadersMode; 83 import io.grpc.okhttp.internal.framed.Settings; 84 import io.grpc.testing.TestMethodDescriptors; 85 import java.io.BufferedReader; 86 import java.io.ByteArrayInputStream; 87 import java.io.IOException; 88 import java.io.InputStream; 89 import java.io.InputStreamReader; 90 import java.net.InetSocketAddress; 91 import java.net.ServerSocket; 92 import java.net.Socket; 93 import java.net.SocketAddress; 94 import java.util.ArrayList; 95 import java.util.Arrays; 96 import java.util.List; 97 import java.util.concurrent.CountDownLatch; 98 import java.util.concurrent.ExecutionException; 99 import java.util.concurrent.ExecutorService; 100 import java.util.concurrent.Executors; 101 import java.util.concurrent.LinkedBlockingQueue; 102 import java.util.concurrent.TimeUnit; 103 import java.util.concurrent.atomic.AtomicBoolean; 104 import javax.annotation.Nullable; 105 import javax.net.ssl.HostnameVerifier; 106 import javax.net.ssl.SSLSocketFactory; 107 import okio.Buffer; 108 import okio.ByteString; 109 import org.junit.After; 110 import org.junit.Before; 111 import org.junit.Rule; 112 import org.junit.Test; 113 import org.junit.rules.Timeout; 114 import org.junit.runner.RunWith; 115 import org.junit.runners.JUnit4; 116 import org.mockito.ArgumentCaptor; 117 import org.mockito.InOrder; 118 import org.mockito.Matchers; 119 import org.mockito.Mock; 120 import org.mockito.MockitoAnnotations; 121 122 /** 123 * Tests for {@link OkHttpClientTransport}. 124 */ 125 @RunWith(JUnit4.class) 126 public class OkHttpClientTransportTest { 127 private static final int TIME_OUT_MS = 2000; 128 private static final String NETWORK_ISSUE_MESSAGE = "network issue"; 129 private static final String ERROR_MESSAGE = "simulated error"; 130 // The gRPC header length, which includes 1 byte compression flag and 4 bytes message length. 131 private static final int HEADER_LENGTH = 5; 132 private static final Status SHUTDOWN_REASON = Status.UNAVAILABLE.withDescription("for test"); 133 private static final ProxyParameters NO_PROXY = null; 134 private static final String NO_USER = null; 135 private static final String NO_PW = null; 136 private static final int DEFAULT_START_STREAM_ID = 3; 137 138 @Rule public final Timeout globalTimeout = Timeout.seconds(10); 139 140 @Mock 141 private FrameWriter frameWriter; 142 143 private MethodDescriptor<Void, Void> method = TestMethodDescriptors.voidMethod(); 144 145 @Mock 146 private ManagedClientTransport.Listener transportListener; 147 148 private final SSLSocketFactory sslSocketFactory = null; 149 private final HostnameVerifier hostnameVerifier = null; 150 private final TransportTracer transportTracer = new TransportTracer(); 151 private OkHttpClientTransport clientTransport; 152 private MockFrameReader frameReader; 153 private ExecutorService executor = Executors.newCachedThreadPool(); 154 private long nanoTime; // backs a ticker, for testing ping round-trip time measurement 155 private SettableFuture<Void> connectedFuture; 156 private DelayConnectedCallback delayConnectedCallback; 157 private Runnable tooManyPingsRunnable = new Runnable() { 158 @Override public void run() { 159 throw new AssertionError(); 160 } 161 }; 162 163 /** Set up for test. */ 164 @Before setUp()165 public void setUp() { 166 MockitoAnnotations.initMocks(this); 167 when(frameWriter.maxDataLength()).thenReturn(Integer.MAX_VALUE); 168 frameReader = new MockFrameReader(); 169 } 170 171 @After tearDown()172 public void tearDown() { 173 executor.shutdownNow(); 174 } 175 initTransport()176 private void initTransport() throws Exception { 177 startTransport(DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, null); 178 } 179 initTransport(int startId)180 private void initTransport(int startId) throws Exception { 181 startTransport(startId, null, true, DEFAULT_MAX_MESSAGE_SIZE, null); 182 } 183 initTransportAndDelayConnected()184 private void initTransportAndDelayConnected() throws Exception { 185 delayConnectedCallback = new DelayConnectedCallback(); 186 startTransport( 187 DEFAULT_START_STREAM_ID, delayConnectedCallback, false, DEFAULT_MAX_MESSAGE_SIZE, null); 188 } 189 startTransport(int startId, @Nullable Runnable connectingCallback, boolean waitingForConnected, int maxMessageSize, String userAgent)190 private void startTransport(int startId, @Nullable Runnable connectingCallback, 191 boolean waitingForConnected, int maxMessageSize, String userAgent) throws Exception { 192 connectedFuture = SettableFuture.create(); 193 final Ticker ticker = new Ticker() { 194 @Override 195 public long read() { 196 return nanoTime; 197 } 198 }; 199 Supplier<Stopwatch> stopwatchSupplier = new Supplier<Stopwatch>() { 200 @Override 201 public Stopwatch get() { 202 return Stopwatch.createUnstarted(ticker); 203 } 204 }; 205 clientTransport = new OkHttpClientTransport( 206 userAgent, 207 executor, 208 frameReader, 209 frameWriter, 210 startId, 211 new MockSocket(frameReader), 212 stopwatchSupplier, 213 connectingCallback, 214 connectedFuture, 215 maxMessageSize, 216 tooManyPingsRunnable, 217 new TransportTracer()); 218 clientTransport.start(transportListener); 219 if (waitingForConnected) { 220 connectedFuture.get(TIME_OUT_MS, TimeUnit.MILLISECONDS); 221 } 222 } 223 224 @Test testToString()225 public void testToString() throws Exception { 226 InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); 227 clientTransport = new OkHttpClientTransport( 228 address, 229 "hostname", 230 /*agent=*/ null, 231 executor, 232 sslSocketFactory, 233 hostnameVerifier, 234 OkHttpChannelBuilder.INTERNAL_DEFAULT_CONNECTION_SPEC, 235 DEFAULT_MAX_MESSAGE_SIZE, 236 NO_PROXY, 237 tooManyPingsRunnable, 238 transportTracer); 239 String s = clientTransport.toString(); 240 assertTrue("Unexpected: " + s, s.contains("OkHttpClientTransport")); 241 assertTrue("Unexpected: " + s, s.contains(address.toString())); 242 } 243 244 @Test maxMessageSizeShouldBeEnforced()245 public void maxMessageSizeShouldBeEnforced() throws Exception { 246 // Allow the response payloads of up to 1 byte. 247 startTransport(3, null, true, 1, null); 248 249 MockStreamListener listener = new MockStreamListener(); 250 OkHttpClientStream stream = 251 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 252 stream.start(listener); 253 stream.request(1); 254 assertContainStream(3); 255 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 256 assertNotNull(listener.headers); 257 258 // Receive the message. 259 final String message = "Hello Client"; 260 Buffer buffer = createMessageFrame(message); 261 frameHandler().data(false, 3, buffer, (int) buffer.size()); 262 263 listener.waitUntilStreamClosed(); 264 assertEquals(Code.RESOURCE_EXHAUSTED, listener.status.getCode()); 265 shutdownAndVerify(); 266 } 267 268 /** 269 * When nextFrame throws IOException, the transport should be aborted. 270 */ 271 @Test nextFrameThrowIoException()272 public void nextFrameThrowIoException() throws Exception { 273 initTransport(); 274 MockStreamListener listener1 = new MockStreamListener(); 275 MockStreamListener listener2 = new MockStreamListener(); 276 OkHttpClientStream stream1 = 277 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 278 stream1.start(listener1); 279 stream1.request(1); 280 OkHttpClientStream stream2 = 281 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 282 stream2.start(listener2); 283 stream2.request(1); 284 assertEquals(2, activeStreamCount()); 285 assertContainStream(3); 286 assertContainStream(5); 287 frameReader.throwIoExceptionForNextFrame(); 288 listener1.waitUntilStreamClosed(); 289 listener2.waitUntilStreamClosed(); 290 291 assertEquals(0, activeStreamCount()); 292 assertEquals(Status.UNAVAILABLE.getCode(), listener1.status.getCode()); 293 assertEquals(NETWORK_ISSUE_MESSAGE, listener1.status.getCause().getMessage()); 294 assertEquals(Status.UNAVAILABLE.getCode(), listener2.status.getCode()); 295 assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage()); 296 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 297 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 298 shutdownAndVerify(); 299 } 300 301 /** 302 * Test that even if an Error is thrown from the reading loop of the transport, 303 * it can still clean up and call transportShutdown() and transportTerminated() as expected 304 * by the channel. 305 */ 306 @Test nextFrameThrowsError()307 public void nextFrameThrowsError() throws Exception { 308 initTransport(); 309 MockStreamListener listener = new MockStreamListener(); 310 OkHttpClientStream stream = 311 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 312 stream.start(listener); 313 stream.request(1); 314 assertEquals(1, activeStreamCount()); 315 assertContainStream(3); 316 frameReader.throwErrorForNextFrame(); 317 listener.waitUntilStreamClosed(); 318 319 assertEquals(0, activeStreamCount()); 320 assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); 321 assertEquals(ERROR_MESSAGE, listener.status.getCause().getMessage()); 322 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 323 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 324 shutdownAndVerify(); 325 } 326 327 @Test nextFrameReturnFalse()328 public void nextFrameReturnFalse() throws Exception { 329 initTransport(); 330 MockStreamListener listener = new MockStreamListener(); 331 OkHttpClientStream stream = 332 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 333 stream.start(listener); 334 stream.request(1); 335 frameReader.nextFrameAtEndOfStream(); 336 listener.waitUntilStreamClosed(); 337 assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode()); 338 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 339 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 340 shutdownAndVerify(); 341 } 342 343 @Test readMessages()344 public void readMessages() throws Exception { 345 initTransport(); 346 final int numMessages = 10; 347 final String message = "Hello Client"; 348 MockStreamListener listener = new MockStreamListener(); 349 OkHttpClientStream stream = 350 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 351 stream.start(listener); 352 stream.request(numMessages); 353 assertContainStream(3); 354 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 355 assertNotNull(listener.headers); 356 for (int i = 0; i < numMessages; i++) { 357 Buffer buffer = createMessageFrame(message + i); 358 frameHandler().data(false, 3, buffer, (int) buffer.size()); 359 } 360 frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); 361 listener.waitUntilStreamClosed(); 362 assertEquals(Status.OK, listener.status); 363 assertNotNull(listener.trailers); 364 assertEquals(numMessages, listener.messages.size()); 365 for (int i = 0; i < numMessages; i++) { 366 assertEquals(message + i, listener.messages.get(i)); 367 } 368 shutdownAndVerify(); 369 } 370 371 @Test receivedHeadersForInvalidStreamShouldKillConnection()372 public void receivedHeadersForInvalidStreamShouldKillConnection() throws Exception { 373 initTransport(); 374 // Empty headers block without correct content type or status 375 frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(), 376 HeadersMode.HTTP_20_HEADERS); 377 verify(frameWriter, timeout(TIME_OUT_MS)) 378 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 379 verify(transportListener).transportShutdown(isA(Status.class)); 380 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 381 shutdownAndVerify(); 382 } 383 384 @Test receivedDataForInvalidStreamShouldKillConnection()385 public void receivedDataForInvalidStreamShouldKillConnection() throws Exception { 386 initTransport(); 387 frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000); 388 verify(frameWriter, timeout(TIME_OUT_MS)) 389 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 390 verify(transportListener).transportShutdown(isA(Status.class)); 391 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 392 shutdownAndVerify(); 393 } 394 395 @Test invalidInboundHeadersCancelStream()396 public void invalidInboundHeadersCancelStream() throws Exception { 397 initTransport(); 398 MockStreamListener listener = new MockStreamListener(); 399 OkHttpClientStream stream = 400 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 401 stream.start(listener); 402 stream.request(1); 403 assertContainStream(3); 404 // Headers block without correct content type or status 405 frameHandler().headers(false, false, 3, 0, Arrays.asList(new Header("random", "4")), 406 HeadersMode.HTTP_20_HEADERS); 407 // Now wait to receive 1000 bytes of data so we can have a better error message before 408 // cancelling the streaam. 409 frameHandler().data(false, 3, createMessageFrame(new String(new char[1000])), 1000); 410 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 411 assertNull(listener.headers); 412 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 413 assertNotNull(listener.trailers); 414 assertEquals("4", listener.trailers 415 .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER))); 416 shutdownAndVerify(); 417 } 418 419 @Test invalidInboundTrailersPropagateToMetadata()420 public void invalidInboundTrailersPropagateToMetadata() throws Exception { 421 initTransport(); 422 MockStreamListener listener = new MockStreamListener(); 423 OkHttpClientStream stream = 424 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 425 stream.start(listener); 426 stream.request(1); 427 assertContainStream(3); 428 // Headers block with EOS without correct content type or status 429 frameHandler().headers(true, true, 3, 0, Arrays.asList(new Header("random", "4")), 430 HeadersMode.HTTP_20_HEADERS); 431 assertNull(listener.headers); 432 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 433 assertNotNull(listener.trailers); 434 assertEquals("4", listener.trailers 435 .get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER))); 436 shutdownAndVerify(); 437 } 438 439 @Test readStatus()440 public void readStatus() throws Exception { 441 initTransport(); 442 MockStreamListener listener = new MockStreamListener(); 443 OkHttpClientStream stream = 444 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 445 stream.start(listener); 446 assertContainStream(3); 447 frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); 448 listener.waitUntilStreamClosed(); 449 assertEquals(Status.Code.OK, listener.status.getCode()); 450 shutdownAndVerify(); 451 } 452 453 @Test receiveReset()454 public void receiveReset() throws Exception { 455 initTransport(); 456 MockStreamListener listener = new MockStreamListener(); 457 OkHttpClientStream stream = 458 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 459 stream.start(listener); 460 assertContainStream(3); 461 frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR); 462 listener.waitUntilStreamClosed(); 463 464 assertThat(listener.status.getDescription()).contains("Rst Stream"); 465 assertThat(listener.status.getCode()).isEqualTo(Code.INTERNAL); 466 shutdownAndVerify(); 467 } 468 469 @Test cancelStream()470 public void cancelStream() throws Exception { 471 initTransport(); 472 MockStreamListener listener = new MockStreamListener(); 473 OkHttpClientStream stream = 474 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 475 stream.start(listener); 476 getStream(3).cancel(Status.CANCELLED); 477 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 478 listener.waitUntilStreamClosed(); 479 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 480 listener.status.getCode()); 481 shutdownAndVerify(); 482 } 483 484 @Test addDefaultUserAgent()485 public void addDefaultUserAgent() throws Exception { 486 initTransport(); 487 MockStreamListener listener = new MockStreamListener(); 488 OkHttpClientStream stream = 489 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 490 stream.start(listener); 491 Header userAgentHeader = new Header(GrpcUtil.USER_AGENT_KEY.name(), 492 GrpcUtil.getGrpcUserAgent("okhttp", null)); 493 List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER, 494 new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"), 495 new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()), 496 userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER); 497 verify(frameWriter, timeout(TIME_OUT_MS)) 498 .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); 499 getStream(3).cancel(Status.CANCELLED); 500 shutdownAndVerify(); 501 } 502 503 @Test overrideDefaultUserAgent()504 public void overrideDefaultUserAgent() throws Exception { 505 startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, "fakeUserAgent"); 506 MockStreamListener listener = new MockStreamListener(); 507 OkHttpClientStream stream = 508 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 509 stream.start(listener); 510 List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER, 511 new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"), 512 new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()), 513 new Header(GrpcUtil.USER_AGENT_KEY.name(), 514 GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")), 515 CONTENT_TYPE_HEADER, TE_HEADER); 516 verify(frameWriter, timeout(TIME_OUT_MS)) 517 .synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders)); 518 getStream(3).cancel(Status.CANCELLED); 519 shutdownAndVerify(); 520 } 521 522 @Test cancelStreamForDeadlineExceeded()523 public void cancelStreamForDeadlineExceeded() throws Exception { 524 initTransport(); 525 MockStreamListener listener = new MockStreamListener(); 526 OkHttpClientStream stream = 527 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 528 stream.start(listener); 529 getStream(3).cancel(Status.DEADLINE_EXCEEDED); 530 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 531 listener.waitUntilStreamClosed(); 532 shutdownAndVerify(); 533 } 534 535 @Test writeMessage()536 public void writeMessage() throws Exception { 537 initTransport(); 538 final String message = "Hello Server"; 539 MockStreamListener listener = new MockStreamListener(); 540 OkHttpClientStream stream = 541 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 542 stream.start(listener); 543 InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8)); 544 assertEquals(12, input.available()); 545 stream.writeMessage(input); 546 stream.flush(); 547 ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class); 548 verify(frameWriter, timeout(TIME_OUT_MS)) 549 .data(eq(false), eq(3), captor.capture(), eq(12 + HEADER_LENGTH)); 550 Buffer sentFrame = captor.getValue(); 551 assertEquals(createMessageFrame(message), sentFrame); 552 stream.cancel(Status.CANCELLED); 553 shutdownAndVerify(); 554 } 555 556 @Test transportTracer_windowSizeDefault()557 public void transportTracer_windowSizeDefault() throws Exception { 558 initTransport(); 559 TransportStats stats = getTransportStats(clientTransport); 560 assertEquals(Utils.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); 561 // okhttp does not track local window sizes 562 assertEquals(-1, stats.localFlowControlWindow); 563 } 564 565 @Test transportTracer_windowSize_remote()566 public void transportTracer_windowSize_remote() throws Exception { 567 initTransport(); 568 TransportStats before = getTransportStats(clientTransport); 569 assertEquals(Utils.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); 570 // okhttp does not track local window sizes 571 assertEquals(-1, before.localFlowControlWindow); 572 573 frameHandler().windowUpdate(0, 1000); 574 TransportStats after = getTransportStats(clientTransport); 575 assertEquals(Utils.DEFAULT_WINDOW_SIZE + 1000, after.remoteFlowControlWindow); 576 // okhttp does not track local window sizes 577 assertEquals(-1, after.localFlowControlWindow); 578 } 579 580 @Test windowUpdate()581 public void windowUpdate() throws Exception { 582 initTransport(); 583 MockStreamListener listener1 = new MockStreamListener(); 584 MockStreamListener listener2 = new MockStreamListener(); 585 OkHttpClientStream stream1 = 586 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 587 stream1.start(listener1); 588 stream1.request(2); 589 590 OkHttpClientStream stream2 = 591 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 592 stream2.start(listener2); 593 stream2.request(2); 594 assertEquals(2, activeStreamCount()); 595 stream1 = getStream(3); 596 stream2 = getStream(5); 597 598 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 599 frameHandler().headers(false, false, 5, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 600 601 int messageLength = Utils.DEFAULT_WINDOW_SIZE / 4; 602 byte[] fakeMessage = new byte[messageLength]; 603 604 // Stream 1 receives a message 605 Buffer buffer = createMessageFrame(fakeMessage); 606 int messageFrameLength = (int) buffer.size(); 607 frameHandler().data(false, 3, buffer, messageFrameLength); 608 609 // Stream 2 receives a message 610 buffer = createMessageFrame(fakeMessage); 611 frameHandler().data(false, 5, buffer, messageFrameLength); 612 613 verify(frameWriter, timeout(TIME_OUT_MS)) 614 .windowUpdate(eq(0), eq((long) 2 * messageFrameLength)); 615 reset(frameWriter); 616 617 // Stream 1 receives another message 618 buffer = createMessageFrame(fakeMessage); 619 frameHandler().data(false, 3, buffer, messageFrameLength); 620 621 verify(frameWriter, timeout(TIME_OUT_MS)) 622 .windowUpdate(eq(3), eq((long) 2 * messageFrameLength)); 623 624 // Stream 2 receives another message 625 buffer = createMessageFrame(fakeMessage); 626 frameHandler().data(false, 5, buffer, messageFrameLength); 627 628 verify(frameWriter, timeout(TIME_OUT_MS)) 629 .windowUpdate(eq(5), eq((long) 2 * messageFrameLength)); 630 verify(frameWriter, timeout(TIME_OUT_MS)) 631 .windowUpdate(eq(0), eq((long) 2 * messageFrameLength)); 632 633 stream1.cancel(Status.CANCELLED); 634 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 635 listener1.waitUntilStreamClosed(); 636 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 637 listener1.status.getCode()); 638 639 stream2.cancel(Status.CANCELLED); 640 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(5), eq(ErrorCode.CANCEL)); 641 listener2.waitUntilStreamClosed(); 642 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 643 listener2.status.getCode()); 644 shutdownAndVerify(); 645 } 646 647 @Test windowUpdateWithInboundFlowControl()648 public void windowUpdateWithInboundFlowControl() throws Exception { 649 initTransport(); 650 MockStreamListener listener = new MockStreamListener(); 651 OkHttpClientStream stream = 652 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 653 stream.start(listener); 654 int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1; 655 byte[] fakeMessage = new byte[messageLength]; 656 657 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 658 Buffer buffer = createMessageFrame(fakeMessage); 659 long messageFrameLength = buffer.size(); 660 frameHandler().data(false, 3, buffer, (int) messageFrameLength); 661 ArgumentCaptor<Integer> idCaptor = ArgumentCaptor.forClass(Integer.class); 662 verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate( 663 idCaptor.capture(), eq(messageFrameLength)); 664 // Should only send window update for the connection. 665 assertEquals(1, idCaptor.getAllValues().size()); 666 assertEquals(0, (int)idCaptor.getValue()); 667 668 stream.request(1); 669 // We return the bytes for the stream window as we read the message. 670 verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(eq(3), eq(messageFrameLength)); 671 672 getStream(3).cancel(Status.CANCELLED); 673 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 674 listener.waitUntilStreamClosed(); 675 assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL).getCode(), 676 listener.status.getCode()); 677 shutdownAndVerify(); 678 } 679 680 @Test outboundFlowControl()681 public void outboundFlowControl() throws Exception { 682 initTransport(); 683 MockStreamListener listener = new MockStreamListener(); 684 OkHttpClientStream stream = 685 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 686 stream.start(listener); 687 // The first message should be sent out. 688 int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1; 689 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 690 stream.writeMessage(input); 691 stream.flush(); 692 verify(frameWriter, timeout(TIME_OUT_MS)).data( 693 eq(false), eq(3), any(Buffer.class), eq(messageLength + HEADER_LENGTH)); 694 695 696 // The second message should be partially sent out. 697 input = new ByteArrayInputStream(new byte[messageLength]); 698 stream.writeMessage(input); 699 stream.flush(); 700 int partiallySentSize = 701 Utils.DEFAULT_WINDOW_SIZE - messageLength - HEADER_LENGTH; 702 verify(frameWriter, timeout(TIME_OUT_MS)) 703 .data(eq(false), eq(3), any(Buffer.class), eq(partiallySentSize)); 704 705 // Get more credit, the rest data should be sent out. 706 frameHandler().windowUpdate(3, Utils.DEFAULT_WINDOW_SIZE); 707 frameHandler().windowUpdate(0, Utils.DEFAULT_WINDOW_SIZE); 708 verify(frameWriter, timeout(TIME_OUT_MS)).data( 709 eq(false), eq(3), any(Buffer.class), 710 eq(messageLength + HEADER_LENGTH - partiallySentSize)); 711 712 stream.cancel(Status.CANCELLED); 713 listener.waitUntilStreamClosed(); 714 shutdownAndVerify(); 715 } 716 717 @Test outboundFlowControlWithInitialWindowSizeChange()718 public void outboundFlowControlWithInitialWindowSizeChange() throws Exception { 719 initTransport(); 720 MockStreamListener listener = new MockStreamListener(); 721 OkHttpClientStream stream = 722 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 723 stream.start(listener); 724 int messageLength = 20; 725 setInitialWindowSize(HEADER_LENGTH + 10); 726 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 727 stream.writeMessage(input); 728 stream.flush(); 729 // part of the message can be sent. 730 verify(frameWriter, timeout(TIME_OUT_MS)) 731 .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10)); 732 // Avoid connection flow control. 733 frameHandler().windowUpdate(0, HEADER_LENGTH + 10); 734 735 // Increase initial window size 736 setInitialWindowSize(HEADER_LENGTH + 20); 737 // The rest data should be sent. 738 verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(false), eq(3), any(Buffer.class), eq(10)); 739 frameHandler().windowUpdate(0, 10); 740 741 // Decrease initial window size to HEADER_LENGTH, since we've already sent 742 // out HEADER_LENGTH + 20 bytes data, the window size should be -20 now. 743 setInitialWindowSize(HEADER_LENGTH); 744 // Get 20 tokens back, still can't send any data. 745 frameHandler().windowUpdate(3, 20); 746 input = new ByteArrayInputStream(new byte[messageLength]); 747 stream.writeMessage(input); 748 stream.flush(); 749 // Only the previous two write operations happened. 750 verify(frameWriter, timeout(TIME_OUT_MS).times(2)) 751 .data(anyBoolean(), anyInt(), any(Buffer.class), anyInt()); 752 753 // Get enough tokens to send the pending message. 754 frameHandler().windowUpdate(3, HEADER_LENGTH + 20); 755 verify(frameWriter, timeout(TIME_OUT_MS)) 756 .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 20)); 757 758 stream.cancel(Status.CANCELLED); 759 listener.waitUntilStreamClosed(); 760 shutdownAndVerify(); 761 } 762 763 @Test outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream()764 public void outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream() throws Exception { 765 initTransport(); 766 MockStreamListener listener = new MockStreamListener(); 767 OkHttpClientStream stream = 768 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 769 stream.start(listener); 770 int messageLength = 20; 771 setInitialWindowSize(HEADER_LENGTH + 10); 772 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 773 stream.writeMessage(input); 774 stream.flush(); 775 // part of the message can be sent. 776 verify(frameWriter, timeout(TIME_OUT_MS)) 777 .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10)); 778 // Avoid connection flow control. 779 frameHandler().windowUpdate(0, HEADER_LENGTH + 20); 780 781 // Increase initial window size 782 setInitialWindowSize(HEADER_LENGTH + 20); 783 784 // wait until pending frames sent (inOrder doesn't support timeout) 785 verify(frameWriter, timeout(TIME_OUT_MS).atLeastOnce()) 786 .data(eq(false), eq(3), any(Buffer.class), eq(10)); 787 // It should ack the settings, then send remaining message. 788 InOrder inOrder = inOrder(frameWriter); 789 inOrder.verify(frameWriter).ackSettings(any(Settings.class)); 790 inOrder.verify(frameWriter).data(eq(false), eq(3), any(Buffer.class), eq(10)); 791 792 stream.cancel(Status.CANCELLED); 793 listener.waitUntilStreamClosed(); 794 shutdownAndVerify(); 795 } 796 797 @Test stopNormally()798 public void stopNormally() throws Exception { 799 initTransport(); 800 MockStreamListener listener1 = new MockStreamListener(); 801 MockStreamListener listener2 = new MockStreamListener(); 802 OkHttpClientStream stream1 = 803 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 804 stream1.start(listener1); 805 OkHttpClientStream stream2 = 806 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 807 stream2.start(listener2); 808 assertEquals(2, activeStreamCount()); 809 clientTransport.shutdown(SHUTDOWN_REASON); 810 811 assertEquals(2, activeStreamCount()); 812 verify(transportListener).transportShutdown(same(SHUTDOWN_REASON)); 813 814 stream1.cancel(Status.CANCELLED); 815 stream2.cancel(Status.CANCELLED); 816 listener1.waitUntilStreamClosed(); 817 listener2.waitUntilStreamClosed(); 818 assertEquals(0, activeStreamCount()); 819 assertEquals(Status.CANCELLED.getCode(), listener1.status.getCode()); 820 assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); 821 verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); 822 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 823 shutdownAndVerify(); 824 } 825 826 @Test receiveGoAway()827 public void receiveGoAway() throws Exception { 828 initTransport(); 829 // start 2 streams. 830 MockStreamListener listener1 = new MockStreamListener(); 831 MockStreamListener listener2 = new MockStreamListener(); 832 OkHttpClientStream stream1 = 833 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 834 stream1.start(listener1); 835 stream1.request(1); 836 OkHttpClientStream stream2 = 837 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 838 stream2.start(listener2); 839 stream2.request(1); 840 assertEquals(2, activeStreamCount()); 841 842 // Receive goAway, max good id is 3. 843 frameHandler().goAway(3, ErrorCode.CANCEL, ByteString.EMPTY); 844 845 // Transport should be in STOPPING state. 846 verify(transportListener).transportShutdown(isA(Status.class)); 847 verify(transportListener, never()).transportTerminated(); 848 849 // Stream 2 should be closed. 850 listener2.waitUntilStreamClosed(); 851 assertEquals(1, activeStreamCount()); 852 assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); 853 854 // New stream should be failed. 855 assertNewStreamFail(); 856 857 // But stream 1 should be able to send. 858 final String sentMessage = "Should I also go away?"; 859 OkHttpClientStream stream = getStream(3); 860 InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8)); 861 assertEquals(22, input.available()); 862 stream.writeMessage(input); 863 stream.flush(); 864 ArgumentCaptor<Buffer> captor = 865 ArgumentCaptor.forClass(Buffer.class); 866 verify(frameWriter, timeout(TIME_OUT_MS)) 867 .data(eq(false), eq(3), captor.capture(), eq(22 + HEADER_LENGTH)); 868 Buffer sentFrame = captor.getValue(); 869 assertEquals(createMessageFrame(sentMessage), sentFrame); 870 871 // And read. 872 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 873 final String receivedMessage = "No, you are fine."; 874 Buffer buffer = createMessageFrame(receivedMessage); 875 frameHandler().data(false, 3, buffer, (int) buffer.size()); 876 frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); 877 listener1.waitUntilStreamClosed(); 878 assertEquals(1, listener1.messages.size()); 879 assertEquals(receivedMessage, listener1.messages.get(0)); 880 881 // The transport should be stopped after all active streams finished. 882 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 883 shutdownAndVerify(); 884 } 885 886 @Test streamIdExhausted()887 public void streamIdExhausted() throws Exception { 888 int startId = Integer.MAX_VALUE - 2; 889 initTransport(startId); 890 891 MockStreamListener listener = new MockStreamListener(); 892 OkHttpClientStream stream = 893 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 894 stream.start(listener); 895 stream.request(1); 896 897 // New stream should be failed. 898 assertNewStreamFail(); 899 900 // The alive stream should be functional, receives a message. 901 frameHandler().headers( 902 false, false, startId, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 903 assertNotNull(listener.headers); 904 String message = "hello"; 905 Buffer buffer = createMessageFrame(message); 906 frameHandler().data(false, startId, buffer, (int) buffer.size()); 907 908 getStream(startId).cancel(Status.CANCELLED); 909 // Receives the second message after be cancelled. 910 buffer = createMessageFrame(message); 911 frameHandler().data(false, startId, buffer, (int) buffer.size()); 912 913 listener.waitUntilStreamClosed(); 914 // Should only have the first message delivered. 915 assertEquals(message, listener.messages.get(0)); 916 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL)); 917 verify(transportListener).transportShutdown(isA(Status.class)); 918 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 919 shutdownAndVerify(); 920 } 921 922 @Test pendingStreamSucceed()923 public void pendingStreamSucceed() throws Exception { 924 initTransport(); 925 setMaxConcurrentStreams(1); 926 final MockStreamListener listener1 = new MockStreamListener(); 927 final MockStreamListener listener2 = new MockStreamListener(); 928 OkHttpClientStream stream1 = 929 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 930 stream1.start(listener1); 931 // The second stream should be pending. 932 OkHttpClientStream stream2 = 933 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 934 stream2.start(listener2); 935 String sentMessage = "hello"; 936 InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8)); 937 assertEquals(5, input.available()); 938 stream2.writeMessage(input); 939 stream2.flush(); 940 stream2.halfClose(); 941 942 waitForStreamPending(1); 943 assertEquals(1, activeStreamCount()); 944 945 // Finish the first stream 946 stream1.cancel(Status.CANCELLED); 947 listener1.waitUntilStreamClosed(); 948 949 // The second stream should be active now, and the pending data should be sent out. 950 assertEquals(1, activeStreamCount()); 951 assertEquals(0, clientTransport.getPendingStreamSize()); 952 ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class); 953 verify(frameWriter, timeout(TIME_OUT_MS)) 954 .data(eq(false), eq(5), captor.capture(), eq(5 + HEADER_LENGTH)); 955 Buffer sentFrame = captor.getValue(); 956 assertEquals(createMessageFrame(sentMessage), sentFrame); 957 verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0)); 958 stream2.cancel(Status.CANCELLED); 959 shutdownAndVerify(); 960 } 961 962 @Test pendingStreamCancelled()963 public void pendingStreamCancelled() throws Exception { 964 initTransport(); 965 setMaxConcurrentStreams(0); 966 MockStreamListener listener = new MockStreamListener(); 967 OkHttpClientStream stream = 968 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 969 stream.start(listener); 970 waitForStreamPending(1); 971 stream.cancel(Status.CANCELLED); 972 // The second cancel should be an no-op. 973 stream.cancel(Status.UNKNOWN); 974 listener.waitUntilStreamClosed(); 975 assertEquals(0, clientTransport.getPendingStreamSize()); 976 assertEquals(Status.CANCELLED.getCode(), listener.status.getCode()); 977 shutdownAndVerify(); 978 } 979 980 @Test pendingStreamFailedByGoAway()981 public void pendingStreamFailedByGoAway() throws Exception { 982 initTransport(); 983 setMaxConcurrentStreams(1); 984 final MockStreamListener listener1 = new MockStreamListener(); 985 final MockStreamListener listener2 = new MockStreamListener(); 986 OkHttpClientStream stream1 = 987 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 988 stream1.start(listener1); 989 // The second stream should be pending. 990 OkHttpClientStream stream2 = 991 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 992 stream2.start(listener2); 993 994 waitForStreamPending(1); 995 assertEquals(1, activeStreamCount()); 996 997 // Receives GO_AWAY. 998 frameHandler().goAway(99, ErrorCode.CANCEL, ByteString.EMPTY); 999 1000 listener2.waitUntilStreamClosed(); 1001 assertEquals(Status.CANCELLED.getCode(), listener2.status.getCode()); 1002 assertEquals(0, clientTransport.getPendingStreamSize()); 1003 1004 // active stream should not be affected. 1005 assertEquals(1, activeStreamCount()); 1006 getStream(3).cancel(Status.CANCELLED); 1007 shutdownAndVerify(); 1008 } 1009 1010 @Test pendingStreamSucceedAfterShutdown()1011 public void pendingStreamSucceedAfterShutdown() throws Exception { 1012 initTransport(); 1013 setMaxConcurrentStreams(0); 1014 final MockStreamListener listener = new MockStreamListener(); 1015 // The second stream should be pending. 1016 OkHttpClientStream stream = 1017 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1018 stream.start(listener); 1019 waitForStreamPending(1); 1020 1021 clientTransport.shutdown(SHUTDOWN_REASON); 1022 setMaxConcurrentStreams(1); 1023 verify(frameWriter, timeout(TIME_OUT_MS)) 1024 .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); 1025 assertEquals(1, activeStreamCount()); 1026 stream.cancel(Status.CANCELLED); 1027 shutdownAndVerify(); 1028 } 1029 1030 @Test pendingStreamFailedByIdExhausted()1031 public void pendingStreamFailedByIdExhausted() throws Exception { 1032 int startId = Integer.MAX_VALUE - 4; 1033 initTransport(startId); 1034 setMaxConcurrentStreams(1); 1035 1036 final MockStreamListener listener1 = new MockStreamListener(); 1037 final MockStreamListener listener2 = new MockStreamListener(); 1038 final MockStreamListener listener3 = new MockStreamListener(); 1039 1040 OkHttpClientStream stream1 = 1041 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1042 stream1.start(listener1); 1043 1044 // The second and third stream should be pending. 1045 OkHttpClientStream stream2 = 1046 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1047 stream2.start(listener2); 1048 OkHttpClientStream stream3 = 1049 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1050 stream3.start(listener3); 1051 1052 waitForStreamPending(2); 1053 assertEquals(1, activeStreamCount()); 1054 1055 // Now finish stream1, stream2 should be started and exhaust the id, 1056 // so stream3 should be failed. 1057 stream1.cancel(Status.CANCELLED); 1058 listener1.waitUntilStreamClosed(); 1059 listener3.waitUntilStreamClosed(); 1060 assertEquals(Status.UNAVAILABLE.getCode(), listener3.status.getCode()); 1061 assertEquals(0, clientTransport.getPendingStreamSize()); 1062 assertEquals(1, activeStreamCount()); 1063 stream2 = getStream(startId + 2); 1064 stream2.cancel(Status.CANCELLED); 1065 shutdownAndVerify(); 1066 } 1067 1068 @Test receivingWindowExceeded()1069 public void receivingWindowExceeded() throws Exception { 1070 initTransport(); 1071 MockStreamListener listener = new MockStreamListener(); 1072 OkHttpClientStream stream = 1073 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1074 stream.start(listener); 1075 stream.request(1); 1076 1077 frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 1078 1079 int messageLength = Utils.DEFAULT_WINDOW_SIZE + 1; 1080 byte[] fakeMessage = new byte[messageLength]; 1081 Buffer buffer = createMessageFrame(fakeMessage); 1082 int messageFrameLength = (int) buffer.size(); 1083 frameHandler().data(false, 3, buffer, messageFrameLength); 1084 1085 listener.waitUntilStreamClosed(); 1086 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1087 assertEquals("Received data size exceeded our receiving window size", 1088 listener.status.getDescription()); 1089 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.FLOW_CONTROL_ERROR)); 1090 shutdownAndVerify(); 1091 } 1092 1093 @Test unaryHeadersShouldNotBeFlushed()1094 public void unaryHeadersShouldNotBeFlushed() throws Exception { 1095 // By default the method is a Unary call 1096 shouldHeadersBeFlushed(false); 1097 shutdownAndVerify(); 1098 } 1099 1100 @Test serverStreamingHeadersShouldNotBeFlushed()1101 public void serverStreamingHeadersShouldNotBeFlushed() throws Exception { 1102 method = method.toBuilder().setType(MethodType.SERVER_STREAMING).build(); 1103 shouldHeadersBeFlushed(false); 1104 shutdownAndVerify(); 1105 } 1106 1107 @Test clientStreamingHeadersShouldBeFlushed()1108 public void clientStreamingHeadersShouldBeFlushed() throws Exception { 1109 method = method.toBuilder().setType(MethodType.CLIENT_STREAMING).build(); 1110 shouldHeadersBeFlushed(true); 1111 shutdownAndVerify(); 1112 } 1113 1114 @Test duplexStreamingHeadersShouldNotBeFlushed()1115 public void duplexStreamingHeadersShouldNotBeFlushed() throws Exception { 1116 method = method.toBuilder().setType(MethodType.BIDI_STREAMING).build(); 1117 shouldHeadersBeFlushed(true); 1118 shutdownAndVerify(); 1119 } 1120 shouldHeadersBeFlushed(boolean shouldBeFlushed)1121 private void shouldHeadersBeFlushed(boolean shouldBeFlushed) throws Exception { 1122 initTransport(); 1123 MockStreamListener listener = new MockStreamListener(); 1124 OkHttpClientStream stream = 1125 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1126 stream.start(listener); 1127 verify(frameWriter, timeout(TIME_OUT_MS)).synStream( 1128 eq(false), eq(false), eq(3), eq(0), Matchers.anyListOf(Header.class)); 1129 if (shouldBeFlushed) { 1130 verify(frameWriter, timeout(TIME_OUT_MS)).flush(); 1131 } else { 1132 verify(frameWriter, timeout(TIME_OUT_MS).times(0)).flush(); 1133 } 1134 stream.cancel(Status.CANCELLED); 1135 } 1136 1137 @Test receiveDataWithoutHeader()1138 public void receiveDataWithoutHeader() throws Exception { 1139 initTransport(); 1140 MockStreamListener listener = new MockStreamListener(); 1141 OkHttpClientStream stream = 1142 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1143 stream.start(listener); 1144 stream.request(1); 1145 Buffer buffer = createMessageFrame(new byte[1]); 1146 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1147 1148 // Trigger the failure by a trailer. 1149 frameHandler().headers( 1150 true, true, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); 1151 1152 listener.waitUntilStreamClosed(); 1153 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1154 assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); 1155 assertEquals(0, listener.messages.size()); 1156 shutdownAndVerify(); 1157 } 1158 1159 @Test receiveDataWithoutHeaderAndTrailer()1160 public void receiveDataWithoutHeaderAndTrailer() throws Exception { 1161 initTransport(); 1162 MockStreamListener listener = new MockStreamListener(); 1163 OkHttpClientStream stream = 1164 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1165 stream.start(listener); 1166 stream.request(1); 1167 Buffer buffer = createMessageFrame(new byte[1]); 1168 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1169 1170 // Trigger the failure by a data frame. 1171 buffer = createMessageFrame(new byte[1]); 1172 frameHandler().data(true, 3, buffer, (int) buffer.size()); 1173 1174 listener.waitUntilStreamClosed(); 1175 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1176 assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); 1177 assertEquals(0, listener.messages.size()); 1178 shutdownAndVerify(); 1179 } 1180 1181 @Test receiveLongEnoughDataWithoutHeaderAndTrailer()1182 public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception { 1183 initTransport(); 1184 MockStreamListener listener = new MockStreamListener(); 1185 OkHttpClientStream stream = 1186 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1187 stream.start(listener); 1188 stream.request(1); 1189 Buffer buffer = createMessageFrame(new byte[1000]); 1190 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1191 1192 // Once we receive enough detail, we cancel the stream. so we should have sent cancel. 1193 verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL)); 1194 1195 listener.waitUntilStreamClosed(); 1196 assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); 1197 assertTrue(listener.status.getDescription().startsWith("headers not received before payload")); 1198 assertEquals(0, listener.messages.size()); 1199 shutdownAndVerify(); 1200 } 1201 1202 @Test receiveDataForUnknownStreamUpdateConnectionWindow()1203 public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception { 1204 initTransport(); 1205 MockStreamListener listener = new MockStreamListener(); 1206 OkHttpClientStream stream = 1207 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1208 stream.start(listener); 1209 stream.cancel(Status.CANCELLED); 1210 1211 Buffer buffer = createMessageFrame( 1212 new byte[Utils.DEFAULT_WINDOW_SIZE / 2 + 1]); 1213 frameHandler().data(false, 3, buffer, (int) buffer.size()); 1214 // Should still update the connection window even stream 3 is gone. 1215 verify(frameWriter, timeout(TIME_OUT_MS)).windowUpdate(0, 1216 HEADER_LENGTH + Utils.DEFAULT_WINDOW_SIZE / 2 + 1); 1217 buffer = createMessageFrame( 1218 new byte[Utils.DEFAULT_WINDOW_SIZE / 2 + 1]); 1219 1220 // This should kill the connection, since we never created stream 5. 1221 frameHandler().data(false, 5, buffer, (int) buffer.size()); 1222 verify(frameWriter, timeout(TIME_OUT_MS)) 1223 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 1224 verify(transportListener).transportShutdown(isA(Status.class)); 1225 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1226 shutdownAndVerify(); 1227 } 1228 1229 @Test receiveWindowUpdateForUnknownStream()1230 public void receiveWindowUpdateForUnknownStream() throws Exception { 1231 initTransport(); 1232 MockStreamListener listener = new MockStreamListener(); 1233 OkHttpClientStream stream = 1234 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1235 stream.start(listener); 1236 stream.cancel(Status.CANCELLED); 1237 // This should be ignored. 1238 frameHandler().windowUpdate(3, 73); 1239 listener.waitUntilStreamClosed(); 1240 // This should kill the connection, since we never created stream 5. 1241 frameHandler().windowUpdate(5, 73); 1242 verify(frameWriter, timeout(TIME_OUT_MS)) 1243 .goAway(eq(0), eq(ErrorCode.PROTOCOL_ERROR), any(byte[].class)); 1244 verify(transportListener).transportShutdown(isA(Status.class)); 1245 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1246 shutdownAndVerify(); 1247 } 1248 1249 @Test shouldBeInitiallyReady()1250 public void shouldBeInitiallyReady() throws Exception { 1251 initTransport(); 1252 MockStreamListener listener = new MockStreamListener(); 1253 OkHttpClientStream stream = 1254 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1255 stream.start(listener); 1256 assertTrue(stream.isReady()); 1257 assertTrue(listener.isOnReadyCalled()); 1258 stream.cancel(Status.CANCELLED); 1259 assertFalse(stream.isReady()); 1260 shutdownAndVerify(); 1261 } 1262 1263 @Test notifyOnReady()1264 public void notifyOnReady() throws Exception { 1265 initTransport(); 1266 // exactly one byte below the threshold 1267 int messageLength = 1268 AbstractStream.TransportState.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1; 1269 setInitialWindowSize(0); 1270 MockStreamListener listener = new MockStreamListener(); 1271 OkHttpClientStream stream = 1272 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1273 stream.start(listener); 1274 assertTrue(stream.isReady()); 1275 // Be notified at the beginning. 1276 assertTrue(listener.isOnReadyCalled()); 1277 1278 // Write a message that will not exceed the notification threshold and queue it. 1279 InputStream input = new ByteArrayInputStream(new byte[messageLength]); 1280 stream.writeMessage(input); 1281 stream.flush(); 1282 assertTrue(stream.isReady()); 1283 1284 // Write another two messages, still be queued. 1285 input = new ByteArrayInputStream(new byte[messageLength]); 1286 stream.writeMessage(input); 1287 stream.flush(); 1288 assertFalse(stream.isReady()); 1289 input = new ByteArrayInputStream(new byte[messageLength]); 1290 stream.writeMessage(input); 1291 stream.flush(); 1292 assertFalse(stream.isReady()); 1293 1294 // Let the first message out. 1295 frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength); 1296 frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength); 1297 assertFalse(stream.isReady()); 1298 assertFalse(listener.isOnReadyCalled()); 1299 1300 // Let the second message out. 1301 frameHandler().windowUpdate(0, HEADER_LENGTH + messageLength); 1302 frameHandler().windowUpdate(3, HEADER_LENGTH + messageLength); 1303 assertTrue(stream.isReady()); 1304 assertTrue(listener.isOnReadyCalled()); 1305 1306 stream.cancel(Status.CANCELLED); 1307 shutdownAndVerify(); 1308 } 1309 1310 @Test transportReady()1311 public void transportReady() throws Exception { 1312 initTransport(); 1313 verifyZeroInteractions(transportListener); 1314 frameHandler().settings(false, new Settings()); 1315 verify(transportListener).transportReady(); 1316 shutdownAndVerify(); 1317 } 1318 1319 @Test ping()1320 public void ping() throws Exception { 1321 initTransport(); 1322 PingCallbackImpl callback1 = new PingCallbackImpl(); 1323 clientTransport.ping(callback1, MoreExecutors.directExecutor()); 1324 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1325 // add'l ping will be added as listener to outstanding operation 1326 PingCallbackImpl callback2 = new PingCallbackImpl(); 1327 clientTransport.ping(callback2, MoreExecutors.directExecutor()); 1328 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1329 1330 ArgumentCaptor<Integer> captor1 = ArgumentCaptor.forClass(int.class); 1331 ArgumentCaptor<Integer> captor2 = ArgumentCaptor.forClass(int.class); 1332 verify(frameWriter, timeout(TIME_OUT_MS)).ping(eq(false), captor1.capture(), captor2.capture()); 1333 // callback not invoked until we see acknowledgement 1334 assertEquals(0, callback1.invocationCount); 1335 assertEquals(0, callback2.invocationCount); 1336 1337 int payload1 = captor1.getValue(); 1338 int payload2 = captor2.getValue(); 1339 // getting a bad ack won't complete the future 1340 // to make the ack "bad", we modify the payload so it doesn't match 1341 frameHandler().ping(true, payload1, payload2 - 1); 1342 // operation not complete because ack was wrong 1343 assertEquals(0, callback1.invocationCount); 1344 assertEquals(0, callback2.invocationCount); 1345 1346 nanoTime += 10101; 1347 1348 // reading the proper response should complete the future 1349 frameHandler().ping(true, payload1, payload2); 1350 assertEquals(1, callback1.invocationCount); 1351 assertEquals(10101, callback1.roundTripTime); 1352 assertNull(callback1.failureCause); 1353 // callback2 piggy-backed on same operation 1354 assertEquals(1, callback2.invocationCount); 1355 assertEquals(10101, callback2.roundTripTime); 1356 assertNull(callback2.failureCause); 1357 1358 // now that previous ping is done, next request returns a different future 1359 callback1 = new PingCallbackImpl(); 1360 clientTransport.ping(callback1, MoreExecutors.directExecutor()); 1361 assertEquals(2, getTransportStats(clientTransport).keepAlivesSent); 1362 assertEquals(0, callback1.invocationCount); 1363 shutdownAndVerify(); 1364 } 1365 1366 @Test ping_failsWhenTransportShutdown()1367 public void ping_failsWhenTransportShutdown() throws Exception { 1368 initTransport(); 1369 PingCallbackImpl callback = new PingCallbackImpl(); 1370 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1371 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1372 assertEquals(0, callback.invocationCount); 1373 1374 clientTransport.shutdown(SHUTDOWN_REASON); 1375 // ping failed on channel shutdown 1376 assertEquals(1, callback.invocationCount); 1377 assertTrue(callback.failureCause instanceof StatusException); 1378 assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); 1379 1380 // now that handler is in terminal state, all future pings fail immediately 1381 callback = new PingCallbackImpl(); 1382 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1383 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1384 assertEquals(1, callback.invocationCount); 1385 assertTrue(callback.failureCause instanceof StatusException); 1386 assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus()); 1387 shutdownAndVerify(); 1388 } 1389 1390 @Test ping_failsIfTransportFails()1391 public void ping_failsIfTransportFails() throws Exception { 1392 initTransport(); 1393 PingCallbackImpl callback = new PingCallbackImpl(); 1394 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1395 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1396 assertEquals(0, callback.invocationCount); 1397 1398 clientTransport.onException(new IOException()); 1399 // ping failed on error 1400 assertEquals(1, callback.invocationCount); 1401 assertTrue(callback.failureCause instanceof StatusException); 1402 assertEquals(Status.Code.UNAVAILABLE, 1403 ((StatusException) callback.failureCause).getStatus().getCode()); 1404 1405 // now that handler is in terminal state, all future pings fail immediately 1406 callback = new PingCallbackImpl(); 1407 clientTransport.ping(callback, MoreExecutors.directExecutor()); 1408 assertEquals(1, getTransportStats(clientTransport).keepAlivesSent); 1409 assertEquals(1, callback.invocationCount); 1410 assertTrue(callback.failureCause instanceof StatusException); 1411 assertEquals(Status.Code.UNAVAILABLE, 1412 ((StatusException) callback.failureCause).getStatus().getCode()); 1413 shutdownAndVerify(); 1414 } 1415 1416 @Test writeBeforeConnected()1417 public void writeBeforeConnected() throws Exception { 1418 initTransportAndDelayConnected(); 1419 final String message = "Hello Server"; 1420 MockStreamListener listener = new MockStreamListener(); 1421 OkHttpClientStream stream = 1422 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1423 stream.start(listener); 1424 InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8)); 1425 stream.writeMessage(input); 1426 stream.flush(); 1427 // The message should be queued. 1428 verifyNoMoreInteractions(frameWriter); 1429 1430 allowTransportConnected(); 1431 1432 // The queued message should be sent out. 1433 ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class); 1434 verify(frameWriter, timeout(TIME_OUT_MS)) 1435 .data(eq(false), eq(3), captor.capture(), eq(12 + HEADER_LENGTH)); 1436 Buffer sentFrame = captor.getValue(); 1437 assertEquals(createMessageFrame(message), sentFrame); 1438 stream.cancel(Status.CANCELLED); 1439 shutdownAndVerify(); 1440 } 1441 1442 @Test cancelBeforeConnected()1443 public void cancelBeforeConnected() throws Exception { 1444 initTransportAndDelayConnected(); 1445 final String message = "Hello Server"; 1446 MockStreamListener listener = new MockStreamListener(); 1447 OkHttpClientStream stream = 1448 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1449 stream.start(listener); 1450 InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8)); 1451 stream.writeMessage(input); 1452 stream.flush(); 1453 stream.cancel(Status.CANCELLED); 1454 verifyNoMoreInteractions(frameWriter); 1455 1456 allowTransportConnected(); 1457 verifyNoMoreInteractions(frameWriter); 1458 shutdownAndVerify(); 1459 } 1460 1461 @Test shutdownDuringConnecting()1462 public void shutdownDuringConnecting() throws Exception { 1463 initTransportAndDelayConnected(); 1464 MockStreamListener listener = new MockStreamListener(); 1465 OkHttpClientStream stream = 1466 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1467 stream.start(listener); 1468 clientTransport.shutdown(SHUTDOWN_REASON); 1469 allowTransportConnected(); 1470 1471 // The new stream should be failed, but not the pending stream. 1472 assertNewStreamFail(); 1473 verify(frameWriter, timeout(TIME_OUT_MS)) 1474 .synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader()); 1475 assertEquals(1, activeStreamCount()); 1476 stream.cancel(Status.CANCELLED); 1477 listener.waitUntilStreamClosed(); 1478 assertEquals(Status.CANCELLED.getCode(), listener.status.getCode()); 1479 shutdownAndVerify(); 1480 } 1481 1482 @Test invalidAuthorityPropagates()1483 public void invalidAuthorityPropagates() { 1484 clientTransport = new OkHttpClientTransport( 1485 new InetSocketAddress("host", 1234), 1486 "invalid_authority", 1487 "userAgent", 1488 executor, 1489 sslSocketFactory, 1490 hostnameVerifier, 1491 ConnectionSpec.CLEARTEXT, 1492 DEFAULT_MAX_MESSAGE_SIZE, 1493 NO_PROXY, 1494 tooManyPingsRunnable, 1495 transportTracer); 1496 1497 String host = clientTransport.getOverridenHost(); 1498 int port = clientTransport.getOverridenPort(); 1499 1500 assertEquals("invalid_authority", host); 1501 assertEquals(1234, port); 1502 } 1503 1504 @Test unreachableServer()1505 public void unreachableServer() throws Exception { 1506 clientTransport = new OkHttpClientTransport( 1507 new InetSocketAddress("localhost", 0), 1508 "authority", 1509 "userAgent", 1510 executor, 1511 sslSocketFactory, 1512 hostnameVerifier, 1513 ConnectionSpec.CLEARTEXT, 1514 DEFAULT_MAX_MESSAGE_SIZE, 1515 NO_PROXY, 1516 tooManyPingsRunnable, 1517 new TransportTracer()); 1518 1519 ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); 1520 clientTransport.start(listener); 1521 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); 1522 verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); 1523 Status status = captor.getValue(); 1524 assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); 1525 assertTrue(status.getCause().toString(), status.getCause() instanceof IOException); 1526 1527 MockStreamListener streamListener = new MockStreamListener(); 1528 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT).start(streamListener); 1529 streamListener.waitUntilStreamClosed(); 1530 assertEquals(Status.UNAVAILABLE.getCode(), streamListener.status.getCode()); 1531 } 1532 1533 @Test proxy_200()1534 public void proxy_200() throws Exception { 1535 ServerSocket serverSocket = new ServerSocket(0); 1536 clientTransport = new OkHttpClientTransport( 1537 InetSocketAddress.createUnresolved("theservice", 80), 1538 "authority", 1539 "userAgent", 1540 executor, 1541 sslSocketFactory, 1542 hostnameVerifier, 1543 ConnectionSpec.CLEARTEXT, 1544 DEFAULT_MAX_MESSAGE_SIZE, 1545 new ProxyParameters( 1546 (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW), 1547 tooManyPingsRunnable, 1548 transportTracer); 1549 clientTransport.start(transportListener); 1550 1551 Socket sock = serverSocket.accept(); 1552 serverSocket.close(); 1553 1554 BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8)); 1555 assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine()); 1556 assertEquals("Host: theservice:80", reader.readLine()); 1557 while (!"".equals(reader.readLine())) {} 1558 1559 sock.getOutputStream().write("HTTP/1.1 200 OK\r\nServer: test\r\n\r\n".getBytes(UTF_8)); 1560 sock.getOutputStream().flush(); 1561 1562 assertEquals("PRI * HTTP/2.0", reader.readLine()); 1563 assertEquals("", reader.readLine()); 1564 assertEquals("SM", reader.readLine()); 1565 assertEquals("", reader.readLine()); 1566 1567 // Empty SETTINGS 1568 sock.getOutputStream().write(new byte[] {0, 0, 0, 0, 0x4, 0}); 1569 // GOAWAY 1570 sock.getOutputStream().write(new byte[] { 1571 0, 0, 0, 8, 0x7, 0, 1572 0, 0, 0, 0, // last stream id 1573 0, 0, 0, 0, // error code 1574 }); 1575 sock.getOutputStream().flush(); 1576 1577 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class)); 1578 while (sock.getInputStream().read() != -1) {} 1579 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1580 sock.close(); 1581 } 1582 1583 @Test proxy_500()1584 public void proxy_500() throws Exception { 1585 ServerSocket serverSocket = new ServerSocket(0); 1586 clientTransport = new OkHttpClientTransport( 1587 InetSocketAddress.createUnresolved("theservice", 80), 1588 "authority", 1589 "userAgent", 1590 executor, 1591 sslSocketFactory, 1592 hostnameVerifier, 1593 ConnectionSpec.CLEARTEXT, 1594 DEFAULT_MAX_MESSAGE_SIZE, 1595 new ProxyParameters( 1596 (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW), 1597 tooManyPingsRunnable, 1598 transportTracer); 1599 clientTransport.start(transportListener); 1600 1601 Socket sock = serverSocket.accept(); 1602 serverSocket.close(); 1603 1604 BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8)); 1605 assertEquals("CONNECT theservice:80 HTTP/1.1", reader.readLine()); 1606 assertEquals("Host: theservice:80", reader.readLine()); 1607 while (!"".equals(reader.readLine())) {} 1608 1609 final String errorText = "text describing error"; 1610 sock.getOutputStream().write("HTTP/1.1 500 OH NO\r\n\r\n".getBytes(UTF_8)); 1611 sock.getOutputStream().write(errorText.getBytes(UTF_8)); 1612 sock.getOutputStream().flush(); 1613 sock.shutdownOutput(); 1614 1615 assertEquals(-1, sock.getInputStream().read()); 1616 1617 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); 1618 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); 1619 Status error = captor.getValue(); 1620 assertTrue("Status didn't contain error code: " + captor.getValue(), 1621 error.getDescription().contains("500")); 1622 assertTrue("Status didn't contain error description: " + captor.getValue(), 1623 error.getDescription().contains("OH NO")); 1624 assertTrue("Status didn't contain error text: " + captor.getValue(), 1625 error.getDescription().contains(errorText)); 1626 assertEquals("Not UNAVAILABLE: " + captor.getValue(), 1627 Status.UNAVAILABLE.getCode(), error.getCode()); 1628 sock.close(); 1629 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1630 } 1631 1632 @Test proxy_immediateServerClose()1633 public void proxy_immediateServerClose() throws Exception { 1634 ServerSocket serverSocket = new ServerSocket(0); 1635 clientTransport = new OkHttpClientTransport( 1636 InetSocketAddress.createUnresolved("theservice", 80), 1637 "authority", 1638 "userAgent", 1639 executor, 1640 sslSocketFactory, 1641 hostnameVerifier, 1642 ConnectionSpec.CLEARTEXT, 1643 DEFAULT_MAX_MESSAGE_SIZE, 1644 new ProxyParameters( 1645 (InetSocketAddress) serverSocket.getLocalSocketAddress(), NO_USER, NO_PW), 1646 tooManyPingsRunnable, 1647 transportTracer); 1648 clientTransport.start(transportListener); 1649 1650 Socket sock = serverSocket.accept(); 1651 serverSocket.close(); 1652 sock.close(); 1653 1654 ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); 1655 verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture()); 1656 Status error = captor.getValue(); 1657 assertTrue("Status didn't contain proxy: " + captor.getValue(), 1658 error.getDescription().contains("proxy")); 1659 assertEquals("Not UNAVAILABLE: " + captor.getValue(), 1660 Status.UNAVAILABLE.getCode(), error.getCode()); 1661 verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated(); 1662 } 1663 1664 @Test goAway_notUtf8()1665 public void goAway_notUtf8() throws Exception { 1666 initTransport(); 1667 // 0xFF is never permitted in UTF-8. 0xF0 should have 3 continuations following, and 0x0a isn't 1668 // a continuation. 1669 frameHandler().goAway( 1670 0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.of((byte) 0xFF, (byte) 0xF0, (byte) 0x0a)); 1671 1672 shutdownAndVerify(); 1673 } 1674 1675 @Test goAway_notTooManyPings()1676 public void goAway_notTooManyPings() throws Exception { 1677 final AtomicBoolean run = new AtomicBoolean(); 1678 tooManyPingsRunnable = new Runnable() { 1679 @Override 1680 public void run() { 1681 run.set(true); 1682 } 1683 }; 1684 initTransport(); 1685 frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("not_many_pings")); 1686 assertFalse(run.get()); 1687 1688 shutdownAndVerify(); 1689 } 1690 1691 @Test goAway_tooManyPings()1692 public void goAway_tooManyPings() throws Exception { 1693 final AtomicBoolean run = new AtomicBoolean(); 1694 tooManyPingsRunnable = new Runnable() { 1695 @Override 1696 public void run() { 1697 run.set(true); 1698 } 1699 }; 1700 initTransport(); 1701 frameHandler().goAway(0, ErrorCode.ENHANCE_YOUR_CALM, ByteString.encodeUtf8("too_many_pings")); 1702 assertTrue(run.get()); 1703 1704 shutdownAndVerify(); 1705 } 1706 1707 @Test goAway_streamListenerRpcProgress()1708 public void goAway_streamListenerRpcProgress() throws Exception { 1709 initTransport(); 1710 setMaxConcurrentStreams(2); 1711 MockStreamListener listener1 = new MockStreamListener(); 1712 MockStreamListener listener2 = new MockStreamListener(); 1713 MockStreamListener listener3 = new MockStreamListener(); 1714 OkHttpClientStream stream1 = 1715 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1716 stream1.start(listener1); 1717 OkHttpClientStream stream2 = 1718 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1719 stream2.start(listener2); 1720 OkHttpClientStream stream3 = 1721 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1722 stream3.start(listener3); 1723 waitForStreamPending(1); 1724 1725 assertEquals(2, activeStreamCount()); 1726 assertContainStream(DEFAULT_START_STREAM_ID); 1727 assertContainStream(DEFAULT_START_STREAM_ID + 2); 1728 1729 frameHandler() 1730 .goAway(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL, ByteString.encodeUtf8("blablabla")); 1731 1732 listener2.waitUntilStreamClosed(); 1733 listener3.waitUntilStreamClosed(); 1734 assertNull(listener1.rpcProgress); 1735 assertEquals(REFUSED, listener2.rpcProgress); 1736 assertEquals(REFUSED, listener3.rpcProgress); 1737 assertEquals(1, activeStreamCount()); 1738 assertContainStream(DEFAULT_START_STREAM_ID); 1739 1740 getStream(DEFAULT_START_STREAM_ID).cancel(Status.CANCELLED); 1741 1742 listener1.waitUntilStreamClosed(); 1743 assertEquals(PROCESSED, listener1.rpcProgress); 1744 1745 shutdownAndVerify(); 1746 } 1747 1748 @Test reset_streamListenerRpcProgress()1749 public void reset_streamListenerRpcProgress() throws Exception { 1750 initTransport(); 1751 MockStreamListener listener1 = new MockStreamListener(); 1752 MockStreamListener listener2 = new MockStreamListener(); 1753 MockStreamListener listener3 = new MockStreamListener(); 1754 OkHttpClientStream stream1 = 1755 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1756 stream1.start(listener1); 1757 OkHttpClientStream stream2 = 1758 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1759 stream2.start(listener2); 1760 OkHttpClientStream stream3 = 1761 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1762 stream3.start(listener3); 1763 1764 assertEquals(3, activeStreamCount()); 1765 assertContainStream(DEFAULT_START_STREAM_ID); 1766 assertContainStream(DEFAULT_START_STREAM_ID + 2); 1767 assertContainStream(DEFAULT_START_STREAM_ID + 4); 1768 1769 frameHandler().rstStream(DEFAULT_START_STREAM_ID + 2, ErrorCode.REFUSED_STREAM); 1770 1771 listener2.waitUntilStreamClosed(); 1772 assertNull(listener1.rpcProgress); 1773 assertEquals(REFUSED, listener2.rpcProgress); 1774 assertNull(listener3.rpcProgress); 1775 1776 frameHandler().rstStream(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL); 1777 listener1.waitUntilStreamClosed(); 1778 assertEquals(PROCESSED, listener1.rpcProgress); 1779 assertNull(listener3.rpcProgress); 1780 1781 getStream(DEFAULT_START_STREAM_ID + 4).cancel(Status.CANCELLED); 1782 1783 listener3.waitUntilStreamClosed(); 1784 assertEquals(PROCESSED, listener3.rpcProgress); 1785 1786 shutdownAndVerify(); 1787 } 1788 activeStreamCount()1789 private int activeStreamCount() { 1790 return clientTransport.getActiveStreams().length; 1791 } 1792 getStream(int streamId)1793 private OkHttpClientStream getStream(int streamId) { 1794 return clientTransport.getStream(streamId); 1795 } 1796 assertContainStream(int streamId)1797 void assertContainStream(int streamId) { 1798 assertNotNull(clientTransport.getStream(streamId)); 1799 } 1800 frameHandler()1801 private ClientFrameHandler frameHandler() throws Exception { 1802 return clientTransport.getHandler(); 1803 } 1804 waitForStreamPending(int expected)1805 private void waitForStreamPending(int expected) throws Exception { 1806 int duration = TIME_OUT_MS / 10; 1807 for (int i = 0; i < 10; i++) { 1808 if (clientTransport.getPendingStreamSize() == expected) { 1809 return; 1810 } 1811 Thread.sleep(duration); 1812 } 1813 assertEquals(expected, clientTransport.getPendingStreamSize()); 1814 } 1815 assertNewStreamFail()1816 private void assertNewStreamFail() throws Exception { 1817 MockStreamListener listener = new MockStreamListener(); 1818 OkHttpClientStream stream = 1819 clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); 1820 stream.start(listener); 1821 listener.waitUntilStreamClosed(); 1822 assertFalse(listener.status.isOk()); 1823 } 1824 setMaxConcurrentStreams(int num)1825 private void setMaxConcurrentStreams(int num) throws Exception { 1826 Settings settings = new Settings(); 1827 OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, num); 1828 frameHandler().settings(false, settings); 1829 } 1830 setInitialWindowSize(int size)1831 private void setInitialWindowSize(int size) throws Exception { 1832 Settings settings = new Settings(); 1833 OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, size); 1834 frameHandler().settings(false, settings); 1835 } 1836 createMessageFrame(String message)1837 private static Buffer createMessageFrame(String message) { 1838 return createMessageFrame(message.getBytes(UTF_8)); 1839 } 1840 createMessageFrame(byte[] message)1841 private static Buffer createMessageFrame(byte[] message) { 1842 Buffer buffer = new Buffer(); 1843 buffer.writeByte(0 /* UNCOMPRESSED */); 1844 buffer.writeInt(message.length); 1845 buffer.write(message); 1846 return buffer; 1847 } 1848 grpcResponseHeaders()1849 private List<Header> grpcResponseHeaders() { 1850 return ImmutableList.of( 1851 new Header(":status", "200"), 1852 CONTENT_TYPE_HEADER); 1853 } 1854 grpcResponseTrailers()1855 private List<Header> grpcResponseTrailers() { 1856 return ImmutableList.of( 1857 new Header(InternalStatus.CODE_KEY.name(), "0"), 1858 // Adding Content-Type and :status for testing responses with only a single HEADERS frame. 1859 new Header(":status", "200"), 1860 CONTENT_TYPE_HEADER); 1861 } 1862 anyListHeader()1863 private static List<Header> anyListHeader() { 1864 return any(); 1865 } 1866 1867 private static class MockFrameReader implements FrameReader { 1868 final CountDownLatch closed = new CountDownLatch(1); 1869 1870 enum Result { 1871 THROW_EXCEPTION, 1872 RETURN_FALSE, 1873 THROW_ERROR 1874 } 1875 1876 final LinkedBlockingQueue<Result> nextResults = new LinkedBlockingQueue<Result>(); 1877 1878 @Override close()1879 public void close() throws IOException { 1880 closed.countDown(); 1881 } 1882 assertClosed()1883 void assertClosed() { 1884 try { 1885 if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) { 1886 fail("Failed waiting frame reader to be closed."); 1887 } 1888 } catch (InterruptedException e) { 1889 Thread.currentThread().interrupt(); 1890 fail("Interrupted while waiting for frame reader to be closed."); 1891 } 1892 } 1893 1894 // The wait is safe; nextFrame is called in a loop and can have spurious wakeups 1895 @SuppressWarnings("WaitNotInLoop") 1896 @Override nextFrame(Handler handler)1897 public boolean nextFrame(Handler handler) throws IOException { 1898 Result result; 1899 try { 1900 result = nextResults.take(); 1901 } catch (InterruptedException e) { 1902 Thread.currentThread().interrupt(); 1903 throw new IOException(e); 1904 } 1905 switch (result) { 1906 case THROW_EXCEPTION: 1907 throw new IOException(NETWORK_ISSUE_MESSAGE); 1908 case RETURN_FALSE: 1909 return false; 1910 case THROW_ERROR: 1911 throw new Error(ERROR_MESSAGE); 1912 default: 1913 throw new UnsupportedOperationException("unimplemented: " + result); 1914 } 1915 } 1916 throwIoExceptionForNextFrame()1917 void throwIoExceptionForNextFrame() { 1918 nextResults.add(Result.THROW_EXCEPTION); 1919 } 1920 throwErrorForNextFrame()1921 void throwErrorForNextFrame() { 1922 nextResults.add(Result.THROW_ERROR); 1923 } 1924 nextFrameAtEndOfStream()1925 void nextFrameAtEndOfStream() { 1926 nextResults.add(Result.RETURN_FALSE); 1927 } 1928 1929 @Override readConnectionPreface()1930 public void readConnectionPreface() throws IOException { 1931 // not used. 1932 } 1933 } 1934 1935 private static class MockStreamListener implements ClientStreamListener { 1936 Status status; 1937 Metadata headers; 1938 Metadata trailers; 1939 RpcProgress rpcProgress; 1940 CountDownLatch closed = new CountDownLatch(1); 1941 ArrayList<String> messages = new ArrayList<>(); 1942 boolean onReadyCalled; 1943 MockStreamListener()1944 MockStreamListener() { 1945 } 1946 1947 @Override headersRead(Metadata headers)1948 public void headersRead(Metadata headers) { 1949 this.headers = headers; 1950 } 1951 1952 @Override messagesAvailable(MessageProducer producer)1953 public void messagesAvailable(MessageProducer producer) { 1954 InputStream inputStream; 1955 while ((inputStream = producer.next()) != null) { 1956 String msg = getContent(inputStream); 1957 if (msg != null) { 1958 messages.add(msg); 1959 } 1960 } 1961 } 1962 1963 @Override closed(Status status, Metadata trailers)1964 public void closed(Status status, Metadata trailers) { 1965 closed(status, PROCESSED, trailers); 1966 } 1967 1968 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)1969 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 1970 this.status = status; 1971 this.trailers = trailers; 1972 this.rpcProgress = rpcProgress; 1973 closed.countDown(); 1974 } 1975 1976 @Override onReady()1977 public void onReady() { 1978 onReadyCalled = true; 1979 } 1980 isOnReadyCalled()1981 boolean isOnReadyCalled() { 1982 boolean value = onReadyCalled; 1983 onReadyCalled = false; 1984 return value; 1985 } 1986 waitUntilStreamClosed()1987 void waitUntilStreamClosed() throws InterruptedException { 1988 if (!closed.await(TIME_OUT_MS, TimeUnit.MILLISECONDS)) { 1989 fail("Failed waiting stream to be closed."); 1990 } 1991 } 1992 1993 @SuppressWarnings("Finally") // We don't care about suppressed exceptions in the test getContent(InputStream message)1994 static String getContent(InputStream message) { 1995 BufferedReader br = new BufferedReader(new InputStreamReader(message, UTF_8)); 1996 try { 1997 // Only one line message is used in this test. 1998 return br.readLine(); 1999 } catch (IOException e) { 2000 return null; 2001 } finally { 2002 try { 2003 message.close(); 2004 } catch (IOException e) { 2005 // Ignore 2006 } 2007 } 2008 } 2009 } 2010 2011 private static class MockSocket extends Socket { 2012 MockFrameReader frameReader; 2013 MockSocket(MockFrameReader frameReader)2014 MockSocket(MockFrameReader frameReader) { 2015 this.frameReader = frameReader; 2016 } 2017 2018 @Override close()2019 public synchronized void close() { 2020 frameReader.nextFrameAtEndOfStream(); 2021 } 2022 2023 @Override getLocalSocketAddress()2024 public SocketAddress getLocalSocketAddress() { 2025 return InetSocketAddress.createUnresolved("localhost", 4000); 2026 } 2027 } 2028 2029 static class PingCallbackImpl implements ClientTransport.PingCallback { 2030 int invocationCount; 2031 long roundTripTime; 2032 Throwable failureCause; 2033 2034 @Override onSuccess(long roundTripTimeNanos)2035 public void onSuccess(long roundTripTimeNanos) { 2036 invocationCount++; 2037 this.roundTripTime = roundTripTimeNanos; 2038 } 2039 2040 @Override onFailure(Throwable cause)2041 public void onFailure(Throwable cause) { 2042 invocationCount++; 2043 this.failureCause = cause; 2044 } 2045 } 2046 allowTransportConnected()2047 private void allowTransportConnected() { 2048 delayConnectedCallback.allowConnected(); 2049 } 2050 shutdownAndVerify()2051 private void shutdownAndVerify() { 2052 clientTransport.shutdown(SHUTDOWN_REASON); 2053 assertEquals(0, activeStreamCount()); 2054 try { 2055 verify(frameWriter, timeout(TIME_OUT_MS)).close(); 2056 } catch (IOException e) { 2057 throw new RuntimeException(e); 2058 2059 } 2060 frameReader.assertClosed(); 2061 } 2062 2063 private static class DelayConnectedCallback implements Runnable { 2064 SettableFuture<Void> delayed = SettableFuture.create(); 2065 2066 @Override run()2067 public void run() { 2068 Futures.getUnchecked(delayed); 2069 } 2070 allowConnected()2071 void allowConnected() { 2072 delayed.set(null); 2073 } 2074 } 2075 getTransportStats(InternalInstrumented<SocketStats> obj)2076 private static TransportStats getTransportStats(InternalInstrumented<SocketStats> obj) 2077 throws ExecutionException, InterruptedException { 2078 return obj.getStats().get().data; 2079 } 2080 } 2081