1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.cronet; 18 19 import static org.junit.Assert.assertEquals; 20 import static org.junit.Assert.assertFalse; 21 import static org.junit.Assert.assertTrue; 22 import static org.mockito.Matchers.any; 23 import static org.mockito.Matchers.eq; 24 import static org.mockito.Matchers.isA; 25 import static org.mockito.Mockito.atLeast; 26 import static org.mockito.Mockito.mock; 27 import static org.mockito.Mockito.times; 28 import static org.mockito.Mockito.verify; 29 import static org.mockito.Mockito.when; 30 31 import com.google.common.io.BaseEncoding; 32 import io.grpc.CallOptions; 33 import io.grpc.Metadata; 34 import io.grpc.MethodDescriptor; 35 import io.grpc.Status; 36 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; 37 import io.grpc.internal.ClientStreamListener; 38 import io.grpc.internal.ClientStreamListener.RpcProgress; 39 import io.grpc.internal.GrpcUtil; 40 import io.grpc.internal.StatsTraceContext; 41 import io.grpc.internal.StreamListener.MessageProducer; 42 import io.grpc.internal.TransportTracer; 43 import io.grpc.internal.WritableBuffer; 44 import io.grpc.testing.TestMethodDescriptors; 45 import java.io.ByteArrayInputStream; 46 import java.nio.ByteBuffer; 47 import java.nio.charset.Charset; 48 import java.util.ArrayList; 49 import java.util.HashMap; 50 import java.util.List; 51 import java.util.Map; 52 import java.util.concurrent.Executor; 53 import org.chromium.net.BidirectionalStream; 54 import org.chromium.net.CronetException; 55 import org.chromium.net.ExperimentalBidirectionalStream; 56 import org.chromium.net.UrlResponseInfo; 57 import org.chromium.net.impl.UrlResponseInfoImpl; 58 import org.junit.Before; 59 import org.junit.Test; 60 import org.junit.runner.RunWith; 61 import org.mockito.ArgumentCaptor; 62 import org.mockito.Mock; 63 import org.mockito.MockitoAnnotations; 64 import org.robolectric.RobolectricTestRunner; 65 66 @RunWith(RobolectricTestRunner.class) 67 public final class CronetClientStreamTest { 68 69 @Mock private CronetClientTransport transport; 70 private Metadata metadata = new Metadata(); 71 @Mock private StreamBuilderFactory factory; 72 @Mock private ExperimentalBidirectionalStream cronetStream; 73 @Mock private Executor executor; 74 @Mock private ClientStreamListener clientListener; 75 @Mock private ExperimentalBidirectionalStream.Builder builder; 76 private final Object lock = new Object(); 77 private final TransportTracer transportTracer = TransportTracer.getDefaultFactory().create(); 78 CronetClientStream clientStream; 79 80 private MethodDescriptor.Marshaller<Void> marshaller = TestMethodDescriptors.voidMarshaller(); 81 82 private MethodDescriptor<?, ?> method = TestMethodDescriptors.voidMethod(); 83 84 private static class SetStreamFactoryRunnable implements Runnable { 85 private final StreamBuilderFactory factory; 86 private CronetClientStream stream; 87 SetStreamFactoryRunnable(StreamBuilderFactory factory)88 SetStreamFactoryRunnable(StreamBuilderFactory factory) { 89 this.factory = factory; 90 } 91 setStream(CronetClientStream stream)92 void setStream(CronetClientStream stream) { 93 this.stream = stream; 94 } 95 96 @Override run()97 public void run() { 98 assertTrue(stream != null); 99 stream.transportState().start(factory); 100 } 101 } 102 103 @Before setUp()104 public void setUp() { 105 MockitoAnnotations.initMocks(this); 106 107 SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory); 108 clientStream = 109 new CronetClientStream( 110 "https://www.google.com:443", 111 "cronet", 112 executor, 113 metadata, 114 transport, 115 callback, 116 lock, 117 100, 118 false /* alwaysUsePut */, 119 method, 120 StatsTraceContext.NOOP, 121 CallOptions.DEFAULT, 122 transportTracer); 123 callback.setStream(clientStream); 124 when(factory.newBidirectionalStreamBuilder( 125 any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) 126 .thenReturn(builder); 127 when(builder.build()).thenReturn(cronetStream); 128 clientStream.start(clientListener); 129 } 130 131 @Test startStream()132 public void startStream() { 133 verify(factory) 134 .newBidirectionalStreamBuilder( 135 eq("https://www.google.com:443"), 136 isA(BidirectionalStream.Callback.class), 137 eq(executor)); 138 verify(builder).build(); 139 // At least content type and trailer headers are set. 140 verify(builder, atLeast(2)).addHeader(isA(String.class), isA(String.class)); 141 // addRequestAnnotation should only be called when we explicitly add the CRONET_ANNOTATION_KEY 142 // to CallOptions. 143 verify(builder, times(0)).addRequestAnnotation(isA(Object.class)); 144 verify(builder, times(0)).setHttpMethod(any(String.class)); 145 verify(cronetStream).start(); 146 } 147 148 @Test write()149 public void write() { 150 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 151 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 152 verify(factory) 153 .newBidirectionalStreamBuilder( 154 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 155 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 156 157 // Create 5 frames to send. 158 CronetWritableBufferAllocator allocator = new CronetWritableBufferAllocator(); 159 String[] requests = new String[5]; 160 WritableBuffer[] buffers = new WritableBuffer[5]; 161 for (int i = 0; i < 5; ++i) { 162 requests[i] = new String("request" + String.valueOf(i)); 163 buffers[i] = allocator.allocate(requests[i].length()); 164 buffers[i].write(requests[i].getBytes(Charset.forName("UTF-8")), 0, requests[i].length()); 165 // The 3rd and 5th writeFrame calls have flush=true. 166 clientStream.abstractClientStreamSink().writeFrame(buffers[i], false, i == 2 || i == 4, 1); 167 } 168 // BidirectionalStream.write is not called because stream is not ready yet. 169 verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class)); 170 171 // Stream is ready. 172 callback.onStreamReady(cronetStream); 173 // 5 writes are called. 174 verify(cronetStream, times(5)).write(isA(ByteBuffer.class), eq(false)); 175 ByteBuffer fakeBuffer = ByteBuffer.allocateDirect(8); 176 fakeBuffer.position(8); 177 verify(cronetStream, times(2)).flush(); 178 179 // 5 onWriteCompleted callbacks for previous writes. 180 callback.onWriteCompleted(cronetStream, null, fakeBuffer, false); 181 callback.onWriteCompleted(cronetStream, null, fakeBuffer, false); 182 callback.onWriteCompleted(cronetStream, null, fakeBuffer, false); 183 callback.onWriteCompleted(cronetStream, null, fakeBuffer, false); 184 callback.onWriteCompleted(cronetStream, null, fakeBuffer, false); 185 186 // All pending data has been sent. onWriteCompleted callback will not trigger any additional 187 // write call. 188 verify(cronetStream, times(5)).write(isA(ByteBuffer.class), eq(false)); 189 190 // Send end of stream. write will be immediately called since stream is ready. 191 clientStream.abstractClientStreamSink().writeFrame(null, true, true, 1); 192 verify(cronetStream, times(1)).write(isA(ByteBuffer.class), eq(true)); 193 verify(cronetStream, times(3)).flush(); 194 } 195 responseHeader(String status)196 private static List<Map.Entry<String, String>> responseHeader(String status) { 197 Map<String, String> headers = new HashMap<String, String>(); 198 headers.put(":status", status); 199 headers.put("content-type", "application/grpc"); 200 headers.put("test-key", "test-value"); 201 List<Map.Entry<String, String>> headerList = new ArrayList<Map.Entry<String, String>>(3); 202 for (Map.Entry<String, String> entry : headers.entrySet()) { 203 headerList.add(entry); 204 } 205 return headerList; 206 } 207 trailers(int status)208 private static List<Map.Entry<String, String>> trailers(int status) { 209 Map<String, String> trailers = new HashMap<String, String>(); 210 trailers.put("grpc-status", String.valueOf(status)); 211 trailers.put("content-type", "application/grpc"); 212 trailers.put("test-trailer-key", "test-trailer-value"); 213 List<Map.Entry<String, String>> trailerList = new ArrayList<Map.Entry<String, String>>(3); 214 for (Map.Entry<String, String> entry : trailers.entrySet()) { 215 trailerList.add(entry); 216 } 217 return trailerList; 218 } 219 createMessageFrame(byte[] bytes)220 private static ByteBuffer createMessageFrame(byte[] bytes) { 221 ByteBuffer buffer = ByteBuffer.allocate(1 + 4 + bytes.length); 222 buffer.put((byte) 0 /* UNCOMPRESSED */); 223 buffer.putInt(bytes.length); 224 buffer.put(bytes); 225 return buffer; 226 } 227 228 @Test read()229 public void read() { 230 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 231 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 232 verify(factory) 233 .newBidirectionalStreamBuilder( 234 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 235 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 236 237 // Read is not called until we receive the response header. 238 verify(cronetStream, times(0)).read(isA(ByteBuffer.class)); 239 UrlResponseInfo info = 240 new UrlResponseInfoImpl( 241 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 242 callback.onResponseHeadersReceived(cronetStream, info); 243 verify(cronetStream, times(1)).read(isA(ByteBuffer.class)); 244 ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); 245 verify(clientListener).headersRead(metadataCaptor.capture()); 246 // Verify recevied headers. 247 Metadata metadata = metadataCaptor.getValue(); 248 assertEquals( 249 "application/grpc", 250 metadata.get(Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER))); 251 assertEquals( 252 "test-value", metadata.get(Metadata.Key.of("test-key", Metadata.ASCII_STRING_MARSHALLER))); 253 254 callback.onReadCompleted( 255 cronetStream, 256 info, 257 (ByteBuffer) createMessageFrame(new String("response1").getBytes(Charset.forName("UTF-8"))), 258 false); 259 // Haven't request any message, so no callback is called here. 260 verify(clientListener, times(0)).messagesAvailable(isA(MessageProducer.class)); 261 verify(cronetStream, times(1)).read(isA(ByteBuffer.class)); 262 // Request one message 263 clientStream.request(1); 264 verify(clientListener, times(1)).messagesAvailable(isA(MessageProducer.class)); 265 verify(cronetStream, times(2)).read(isA(ByteBuffer.class)); 266 267 // BidirectionalStream.read will not be called again after receiving endOfStream(empty buffer). 268 clientStream.request(1); 269 callback.onReadCompleted(cronetStream, info, ByteBuffer.allocate(0), true); 270 verify(clientListener, times(1)).messagesAvailable(isA(MessageProducer.class)); 271 verify(cronetStream, times(2)).read(isA(ByteBuffer.class)); 272 } 273 274 @Test streamSucceeded()275 public void streamSucceeded() { 276 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 277 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 278 verify(factory) 279 .newBidirectionalStreamBuilder( 280 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 281 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 282 283 callback.onStreamReady(cronetStream); 284 verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class)); 285 // Send the first data frame. 286 CronetWritableBufferAllocator allocator = new CronetWritableBufferAllocator(); 287 String request = new String("request"); 288 WritableBuffer writableBuffer = allocator.allocate(request.length()); 289 writableBuffer.write(request.getBytes(Charset.forName("UTF-8")), 0, request.length()); 290 clientStream.abstractClientStreamSink().writeFrame(writableBuffer, false, true, 1); 291 ArgumentCaptor<ByteBuffer> bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); 292 verify(cronetStream, times(1)).write(bufferCaptor.capture(), isA(Boolean.class)); 293 ByteBuffer buffer = bufferCaptor.getValue(); 294 buffer.position(request.length()); 295 verify(cronetStream, times(1)).flush(); 296 297 // Receive response header 298 clientStream.request(2); 299 UrlResponseInfo info = 300 new UrlResponseInfoImpl( 301 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 302 callback.onResponseHeadersReceived(cronetStream, info); 303 verify(cronetStream, times(1)).read(isA(ByteBuffer.class)); 304 // Receive one message 305 callback.onReadCompleted( 306 cronetStream, 307 info, 308 (ByteBuffer) createMessageFrame(new String("response").getBytes(Charset.forName("UTF-8"))), 309 false); 310 verify(clientListener, times(1)).messagesAvailable(isA(MessageProducer.class)); 311 verify(cronetStream, times(2)).read(isA(ByteBuffer.class)); 312 313 // Send endOfStream 314 callback.onWriteCompleted(cronetStream, null, buffer, false); 315 clientStream.abstractClientStreamSink().writeFrame(null, true, true, 1); 316 verify(cronetStream, times(2)).write(isA(ByteBuffer.class), isA(Boolean.class)); 317 verify(cronetStream, times(2)).flush(); 318 319 // Receive trailer 320 ((CronetClientStream.BidirectionalStreamCallback) callback).processTrailers(trailers(0)); 321 callback.onSucceeded(cronetStream, info); 322 323 // Verify trailer 324 ArgumentCaptor<Metadata> trailerCaptor = ArgumentCaptor.forClass(Metadata.class); 325 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 326 verify(clientListener) 327 .closed(statusCaptor.capture(), isA(RpcProgress.class), trailerCaptor.capture()); 328 // Verify recevied headers. 329 Metadata trailers = trailerCaptor.getValue(); 330 Status status = statusCaptor.getValue(); 331 assertEquals( 332 "test-trailer-value", 333 trailers.get(Metadata.Key.of("test-trailer-key", Metadata.ASCII_STRING_MARSHALLER))); 334 assertEquals( 335 "application/grpc", 336 trailers.get(Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER))); 337 assertTrue(status.isOk()); 338 } 339 340 @Test streamSucceededWithGrpcError()341 public void streamSucceededWithGrpcError() { 342 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 343 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 344 verify(factory) 345 .newBidirectionalStreamBuilder( 346 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 347 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 348 349 callback.onStreamReady(cronetStream); 350 verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class)); 351 clientStream.abstractClientStreamSink().writeFrame(null, true, true, 1); 352 verify(cronetStream, times(1)).write(isA(ByteBuffer.class), isA(Boolean.class)); 353 verify(cronetStream, times(1)).flush(); 354 355 // Receive response header 356 clientStream.request(2); 357 UrlResponseInfo info = 358 new UrlResponseInfoImpl( 359 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 360 callback.onResponseHeadersReceived(cronetStream, info); 361 verify(cronetStream, times(1)).read(isA(ByteBuffer.class)); 362 363 // Receive trailer 364 callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true); 365 ((CronetClientStream.BidirectionalStreamCallback) callback) 366 .processTrailers(trailers(Status.PERMISSION_DENIED.getCode().value())); 367 callback.onSucceeded(cronetStream, info); 368 369 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 370 verify(clientListener) 371 .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class)); 372 // Verify error status. 373 Status status = statusCaptor.getValue(); 374 assertFalse(status.isOk()); 375 assertEquals(Status.PERMISSION_DENIED.getCode(), status.getCode()); 376 } 377 378 @Test streamFailed()379 public void streamFailed() { 380 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 381 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 382 verify(factory) 383 .newBidirectionalStreamBuilder( 384 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 385 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 386 387 // Nothing happens and stream fails 388 389 CronetException exception = mock(CronetException.class); 390 callback.onFailed(cronetStream, null, exception); 391 verify(transport).finishStream(eq(clientStream), isA(Status.class)); 392 // finishStream calls transportReportStatus. 393 clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata()); 394 395 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 396 verify(clientListener) 397 .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class)); 398 Status status = statusCaptor.getValue(); 399 assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); 400 } 401 402 @Test streamFailedAfterResponseHeaderReceived()403 public void streamFailedAfterResponseHeaderReceived() { 404 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 405 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 406 verify(factory) 407 .newBidirectionalStreamBuilder( 408 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 409 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 410 411 // Receive response header 412 UrlResponseInfo info = 413 new UrlResponseInfoImpl( 414 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 415 callback.onResponseHeadersReceived(cronetStream, info); 416 417 CronetException exception = mock(CronetException.class); 418 callback.onFailed(cronetStream, info, exception); 419 verify(transport).finishStream(eq(clientStream), isA(Status.class)); 420 // finishStream calls transportReportStatus. 421 clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata()); 422 423 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 424 verify(clientListener) 425 .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class)); 426 Status status = statusCaptor.getValue(); 427 assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); 428 } 429 430 @Test streamFailedAfterTrailerReceived()431 public void streamFailedAfterTrailerReceived() { 432 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 433 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 434 verify(factory) 435 .newBidirectionalStreamBuilder( 436 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 437 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 438 439 // Receive response header 440 UrlResponseInfo info = 441 new UrlResponseInfoImpl( 442 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 443 callback.onResponseHeadersReceived(cronetStream, info); 444 445 // Report trailer but not endOfStream. 446 ((CronetClientStream.BidirectionalStreamCallback) callback).processTrailers(trailers(0)); 447 448 CronetException exception = mock(CronetException.class); 449 callback.onFailed(cronetStream, info, exception); 450 verify(transport).finishStream(eq(clientStream), isA(Status.class)); 451 // finishStream calls transportReportStatus. 452 clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata()); 453 454 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 455 verify(clientListener) 456 .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class)); 457 Status status = statusCaptor.getValue(); 458 // Stream has already finished so OK status should be reported. 459 assertEquals(Status.UNAVAILABLE.getCode(), status.getCode()); 460 } 461 462 @Test streamFailedAfterTrailerAndEndOfStreamReceived()463 public void streamFailedAfterTrailerAndEndOfStreamReceived() { 464 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 465 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 466 verify(factory) 467 .newBidirectionalStreamBuilder( 468 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 469 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 470 471 // Receive response header 472 UrlResponseInfo info = 473 new UrlResponseInfoImpl( 474 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 475 callback.onResponseHeadersReceived(cronetStream, info); 476 477 // Report trailer and endOfStream 478 callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true); 479 ((CronetClientStream.BidirectionalStreamCallback) callback).processTrailers(trailers(0)); 480 481 CronetException exception = mock(CronetException.class); 482 callback.onFailed(cronetStream, info, exception); 483 verify(transport).finishStream(eq(clientStream), isA(Status.class)); 484 // finishStream calls transportReportStatus. 485 clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata()); 486 487 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 488 verify(clientListener) 489 .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class)); 490 Status status = statusCaptor.getValue(); 491 // Stream has already finished so OK status should be reported. 492 assertEquals(Status.OK.getCode(), status.getCode()); 493 } 494 495 @Test cancelStream()496 public void cancelStream() { 497 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 498 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 499 verify(factory) 500 .newBidirectionalStreamBuilder( 501 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 502 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 503 504 // Cancel the stream 505 clientStream.cancel(Status.DEADLINE_EXCEEDED); 506 verify(transport, times(0)).finishStream(eq(clientStream), isA(Status.class)); 507 508 callback.onCanceled(cronetStream, null); 509 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 510 verify(transport, times(1)).finishStream(eq(clientStream), statusCaptor.capture()); 511 Status status = statusCaptor.getValue(); 512 assertEquals(Status.DEADLINE_EXCEEDED.getCode(), status.getCode()); 513 } 514 515 @Test reportTrailersWhenTrailersReceivedBeforeReadClosed()516 public void reportTrailersWhenTrailersReceivedBeforeReadClosed() { 517 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 518 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 519 verify(factory) 520 .newBidirectionalStreamBuilder( 521 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 522 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 523 524 callback.onStreamReady(cronetStream); 525 UrlResponseInfo info = 526 new UrlResponseInfoImpl( 527 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 528 callback.onResponseHeadersReceived(cronetStream, info); 529 // Receive trailer first 530 ((CronetClientStream.BidirectionalStreamCallback) callback) 531 .processTrailers(trailers(Status.UNAUTHENTICATED.getCode().value())); 532 verify(clientListener, times(0)) 533 .closed(isA(Status.class), isA(RpcProgress.class), isA(Metadata.class)); 534 535 // Receive cronet's endOfStream 536 callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true); 537 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 538 verify(clientListener, times(1)) 539 .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class)); 540 Status status = statusCaptor.getValue(); 541 assertEquals(Status.UNAUTHENTICATED.getCode(), status.getCode()); 542 } 543 544 @Test reportTrailersWhenTrailersReceivedAfterReadClosed()545 public void reportTrailersWhenTrailersReceivedAfterReadClosed() { 546 ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor = 547 ArgumentCaptor.forClass(BidirectionalStream.Callback.class); 548 verify(factory) 549 .newBidirectionalStreamBuilder( 550 isA(String.class), callbackCaptor.capture(), isA(Executor.class)); 551 BidirectionalStream.Callback callback = callbackCaptor.getValue(); 552 553 callback.onStreamReady(cronetStream); 554 UrlResponseInfo info = 555 new UrlResponseInfoImpl( 556 new ArrayList<String>(), 200, "", responseHeader("200"), false, "", ""); 557 callback.onResponseHeadersReceived(cronetStream, info); 558 // Receive cronet's endOfStream 559 callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true); 560 verify(clientListener, times(0)) 561 .closed(isA(Status.class), isA(RpcProgress.class), isA(Metadata.class)); 562 563 // Receive trailer 564 ((CronetClientStream.BidirectionalStreamCallback) callback) 565 .processTrailers(trailers(Status.UNAUTHENTICATED.getCode().value())); 566 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 567 verify(clientListener, times(1)) 568 .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class)); 569 Status status = statusCaptor.getValue(); 570 assertEquals(Status.UNAUTHENTICATED.getCode(), status.getCode()); 571 } 572 573 @Test addCronetRequestAnnotation_deprecated()574 public void addCronetRequestAnnotation_deprecated() { 575 Object annotation = new Object(); 576 SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory); 577 CronetClientStream stream = 578 new CronetClientStream( 579 "https://www.google.com:443", 580 "cronet", 581 executor, 582 metadata, 583 transport, 584 callback, 585 lock, 586 100, 587 false /* alwaysUsePut */, 588 method, 589 StatsTraceContext.NOOP, 590 CallOptions.DEFAULT.withOption(CronetCallOptions.CRONET_ANNOTATION_KEY, annotation), 591 transportTracer); 592 callback.setStream(stream); 593 when(factory.newBidirectionalStreamBuilder( 594 any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) 595 .thenReturn(builder); 596 stream.start(clientListener); 597 598 // addRequestAnnotation should be called since we add the option CRONET_ANNOTATION_KEY above. 599 verify(builder).addRequestAnnotation(annotation); 600 } 601 602 @Test withAnnotation()603 public void withAnnotation() { 604 Object annotation1 = new Object(); 605 Object annotation2 = new Object(); 606 CallOptions callOptions = CronetCallOptions.withAnnotation(CallOptions.DEFAULT, annotation1); 607 callOptions = CronetCallOptions.withAnnotation(callOptions, annotation2); 608 609 SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory); 610 CronetClientStream stream = 611 new CronetClientStream( 612 "https://www.google.com:443", 613 "cronet", 614 executor, 615 metadata, 616 transport, 617 callback, 618 lock, 619 100, 620 false /* alwaysUsePut */, 621 method, 622 StatsTraceContext.NOOP, 623 callOptions, 624 transportTracer); 625 callback.setStream(stream); 626 when(factory.newBidirectionalStreamBuilder( 627 any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) 628 .thenReturn(builder); 629 stream.start(clientListener); 630 631 verify(builder).addRequestAnnotation(annotation1); 632 verify(builder).addRequestAnnotation(annotation2); 633 } 634 635 @Test getUnaryRequest()636 public void getUnaryRequest() { 637 StreamBuilderFactory getFactory = mock(StreamBuilderFactory.class); 638 MethodDescriptor<?, ?> getMethod = 639 MethodDescriptor.<Void, Void>newBuilder() 640 .setType(MethodDescriptor.MethodType.UNARY) 641 .setFullMethodName("/service/method") 642 .setIdempotent(true) 643 .setSafe(true) 644 .setRequestMarshaller(marshaller) 645 .setResponseMarshaller(marshaller) 646 .build(); 647 SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(getFactory); 648 CronetClientStream stream = 649 new CronetClientStream( 650 "https://www.google.com/service/method", 651 "cronet", 652 executor, 653 metadata, 654 transport, 655 callback, 656 lock, 657 100, 658 false /* alwaysUsePut */, 659 getMethod, 660 StatsTraceContext.NOOP, 661 CallOptions.DEFAULT, 662 transportTracer); 663 callback.setStream(stream); 664 ExperimentalBidirectionalStream.Builder getBuilder = 665 mock(ExperimentalBidirectionalStream.Builder.class); 666 when(getFactory.newBidirectionalStreamBuilder( 667 any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) 668 .thenReturn(getBuilder); 669 when(getBuilder.build()).thenReturn(cronetStream); 670 stream.start(clientListener); 671 672 // We will not create BidirectionalStream until we have the full request. 673 verify(getFactory, times(0)) 674 .newBidirectionalStreamBuilder( 675 isA(String.class), isA(BidirectionalStream.Callback.class), isA(Executor.class)); 676 677 byte[] msg = "request".getBytes(Charset.forName("UTF-8")); 678 stream.writeMessage(new ByteArrayInputStream(msg)); 679 // We still haven't built the stream or sent anything. 680 verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class)); 681 verify(getFactory, times(0)) 682 .newBidirectionalStreamBuilder( 683 isA(String.class), isA(BidirectionalStream.Callback.class), isA(Executor.class)); 684 685 // halfClose will trigger sending. 686 stream.halfClose(); 687 688 // Stream should be built with request payload in the header. 689 ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class); 690 verify(getFactory) 691 .newBidirectionalStreamBuilder( 692 urlCaptor.capture(), isA(BidirectionalStream.Callback.class), isA(Executor.class)); 693 verify(getBuilder).setHttpMethod("GET"); 694 assertEquals( 695 "https://www.google.com/service/method?" + BaseEncoding.base64().encode(msg), 696 urlCaptor.getValue()); 697 } 698 699 @Test idempotentMethod_usesHttpPut()700 public void idempotentMethod_usesHttpPut() { 701 SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory); 702 MethodDescriptor<?, ?> idempotentMethod = method.toBuilder().setIdempotent(true).build(); 703 CronetClientStream stream = 704 new CronetClientStream( 705 "https://www.google.com:443", 706 "cronet", 707 executor, 708 metadata, 709 transport, 710 callback, 711 lock, 712 100, 713 false /* alwaysUsePut */, 714 idempotentMethod, 715 StatsTraceContext.NOOP, 716 CallOptions.DEFAULT, 717 transportTracer); 718 callback.setStream(stream); 719 ExperimentalBidirectionalStream.Builder builder = 720 mock(ExperimentalBidirectionalStream.Builder.class); 721 when(factory.newBidirectionalStreamBuilder( 722 any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) 723 .thenReturn(builder); 724 when(builder.build()).thenReturn(cronetStream); 725 stream.start(clientListener); 726 727 verify(builder).setHttpMethod("PUT"); 728 } 729 730 @Test alwaysUsePutOption_usesHttpPut()731 public void alwaysUsePutOption_usesHttpPut() { 732 SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory); 733 CronetClientStream stream = 734 new CronetClientStream( 735 "https://www.google.com:443", 736 "cronet", 737 executor, 738 metadata, 739 transport, 740 callback, 741 lock, 742 100, 743 true /* alwaysUsePut */, 744 method, 745 StatsTraceContext.NOOP, 746 CallOptions.DEFAULT, 747 transportTracer); 748 callback.setStream(stream); 749 ExperimentalBidirectionalStream.Builder builder = 750 mock(ExperimentalBidirectionalStream.Builder.class); 751 when(factory.newBidirectionalStreamBuilder( 752 any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) 753 .thenReturn(builder); 754 when(builder.build()).thenReturn(cronetStream); 755 stream.start(clientListener); 756 757 verify(builder).setHttpMethod("PUT"); 758 } 759 760 @Test reservedHeadersStripped()761 public void reservedHeadersStripped() { 762 String userAgent = "cronet"; 763 Metadata headers = new Metadata(); 764 Metadata.Key<String> userKey = Metadata.Key.of("user-key", Metadata.ASCII_STRING_MARSHALLER); 765 headers.put(GrpcUtil.CONTENT_TYPE_KEY, "to-be-removed"); 766 headers.put(GrpcUtil.USER_AGENT_KEY, "to-be-removed"); 767 headers.put(GrpcUtil.TE_HEADER, "to-be-removed"); 768 headers.put(userKey, "user-value"); 769 770 SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory); 771 CronetClientStream stream = 772 new CronetClientStream( 773 "https://www.google.com:443", 774 userAgent, 775 executor, 776 headers, 777 transport, 778 callback, 779 lock, 780 100, 781 false /* alwaysUsePut */, 782 method, 783 StatsTraceContext.NOOP, 784 CallOptions.DEFAULT, 785 transportTracer); 786 callback.setStream(stream); 787 ExperimentalBidirectionalStream.Builder builder = 788 mock(ExperimentalBidirectionalStream.Builder.class); 789 when(factory.newBidirectionalStreamBuilder( 790 any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class))) 791 .thenReturn(builder); 792 when(builder.build()).thenReturn(cronetStream); 793 stream.start(clientListener); 794 795 verify(builder, times(4)).addHeader(any(String.class), any(String.class)); 796 verify(builder).addHeader(GrpcUtil.USER_AGENT_KEY.name(), userAgent); 797 verify(builder).addHeader(GrpcUtil.CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC); 798 verify(builder).addHeader("te", GrpcUtil.TE_TRAILERS); 799 verify(builder).addHeader(userKey.name(), "user-value"); 800 } 801 } 802