• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.cronet;
18 
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertTrue;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.eq;
24 import static org.mockito.Matchers.isA;
25 import static org.mockito.Mockito.atLeast;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.when;
30 
31 import com.google.common.io.BaseEncoding;
32 import io.grpc.CallOptions;
33 import io.grpc.Metadata;
34 import io.grpc.MethodDescriptor;
35 import io.grpc.Status;
36 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
37 import io.grpc.internal.ClientStreamListener;
38 import io.grpc.internal.ClientStreamListener.RpcProgress;
39 import io.grpc.internal.GrpcUtil;
40 import io.grpc.internal.StatsTraceContext;
41 import io.grpc.internal.StreamListener.MessageProducer;
42 import io.grpc.internal.TransportTracer;
43 import io.grpc.internal.WritableBuffer;
44 import io.grpc.testing.TestMethodDescriptors;
45 import java.io.ByteArrayInputStream;
46 import java.nio.ByteBuffer;
47 import java.nio.charset.Charset;
48 import java.util.ArrayList;
49 import java.util.HashMap;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.concurrent.Executor;
53 import org.chromium.net.BidirectionalStream;
54 import org.chromium.net.CronetException;
55 import org.chromium.net.ExperimentalBidirectionalStream;
56 import org.chromium.net.UrlResponseInfo;
57 import org.chromium.net.impl.UrlResponseInfoImpl;
58 import org.junit.Before;
59 import org.junit.Test;
60 import org.junit.runner.RunWith;
61 import org.mockito.ArgumentCaptor;
62 import org.mockito.Mock;
63 import org.mockito.MockitoAnnotations;
64 import org.robolectric.RobolectricTestRunner;
65 
66 @RunWith(RobolectricTestRunner.class)
67 public final class CronetClientStreamTest {
68 
69   @Mock private CronetClientTransport transport;
70   private Metadata metadata = new Metadata();
71   @Mock private StreamBuilderFactory factory;
72   @Mock private ExperimentalBidirectionalStream cronetStream;
73   @Mock private Executor executor;
74   @Mock private ClientStreamListener clientListener;
75   @Mock private ExperimentalBidirectionalStream.Builder builder;
76   private final Object lock = new Object();
77   private final TransportTracer transportTracer = TransportTracer.getDefaultFactory().create();
78   CronetClientStream clientStream;
79 
80   private MethodDescriptor.Marshaller<Void> marshaller = TestMethodDescriptors.voidMarshaller();
81 
82   private MethodDescriptor<?, ?> method = TestMethodDescriptors.voidMethod();
83 
84   private static class SetStreamFactoryRunnable implements Runnable {
85     private final StreamBuilderFactory factory;
86     private CronetClientStream stream;
87 
SetStreamFactoryRunnable(StreamBuilderFactory factory)88     SetStreamFactoryRunnable(StreamBuilderFactory factory) {
89       this.factory = factory;
90     }
91 
setStream(CronetClientStream stream)92     void setStream(CronetClientStream stream) {
93       this.stream = stream;
94     }
95 
96     @Override
run()97     public void run() {
98       assertTrue(stream != null);
99       stream.transportState().start(factory);
100     }
101   }
102 
103   @Before
setUp()104   public void setUp() {
105     MockitoAnnotations.initMocks(this);
106 
107     SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory);
108     clientStream =
109         new CronetClientStream(
110             "https://www.google.com:443",
111             "cronet",
112             executor,
113             metadata,
114             transport,
115             callback,
116             lock,
117             100,
118             false /* alwaysUsePut */,
119             method,
120             StatsTraceContext.NOOP,
121             CallOptions.DEFAULT,
122             transportTracer);
123     callback.setStream(clientStream);
124     when(factory.newBidirectionalStreamBuilder(
125             any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
126         .thenReturn(builder);
127     when(builder.build()).thenReturn(cronetStream);
128     clientStream.start(clientListener);
129   }
130 
131   @Test
startStream()132   public void startStream() {
133     verify(factory)
134         .newBidirectionalStreamBuilder(
135             eq("https://www.google.com:443"),
136             isA(BidirectionalStream.Callback.class),
137             eq(executor));
138     verify(builder).build();
139     // At least content type and trailer headers are set.
140     verify(builder, atLeast(2)).addHeader(isA(String.class), isA(String.class));
141     // addRequestAnnotation should only be called when we explicitly add the CRONET_ANNOTATION_KEY
142     // to CallOptions.
143     verify(builder, times(0)).addRequestAnnotation(isA(Object.class));
144     verify(builder, times(0)).setHttpMethod(any(String.class));
145     verify(cronetStream).start();
146   }
147 
148   @Test
write()149   public void write() {
150     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
151         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
152     verify(factory)
153         .newBidirectionalStreamBuilder(
154             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
155     BidirectionalStream.Callback callback = callbackCaptor.getValue();
156 
157     // Create 5 frames to send.
158     CronetWritableBufferAllocator allocator = new CronetWritableBufferAllocator();
159     String[] requests = new String[5];
160     WritableBuffer[] buffers = new WritableBuffer[5];
161     for (int i = 0; i < 5; ++i) {
162       requests[i] = new String("request" + String.valueOf(i));
163       buffers[i] = allocator.allocate(requests[i].length());
164       buffers[i].write(requests[i].getBytes(Charset.forName("UTF-8")), 0, requests[i].length());
165       // The 3rd and 5th writeFrame calls have flush=true.
166       clientStream.abstractClientStreamSink().writeFrame(buffers[i], false, i == 2 || i == 4, 1);
167     }
168     // BidirectionalStream.write is not called because stream is not ready yet.
169     verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class));
170 
171     // Stream is ready.
172     callback.onStreamReady(cronetStream);
173     // 5 writes are called.
174     verify(cronetStream, times(5)).write(isA(ByteBuffer.class), eq(false));
175     ByteBuffer fakeBuffer = ByteBuffer.allocateDirect(8);
176     fakeBuffer.position(8);
177     verify(cronetStream, times(2)).flush();
178 
179     // 5 onWriteCompleted callbacks for previous writes.
180     callback.onWriteCompleted(cronetStream, null, fakeBuffer, false);
181     callback.onWriteCompleted(cronetStream, null, fakeBuffer, false);
182     callback.onWriteCompleted(cronetStream, null, fakeBuffer, false);
183     callback.onWriteCompleted(cronetStream, null, fakeBuffer, false);
184     callback.onWriteCompleted(cronetStream, null, fakeBuffer, false);
185 
186     // All pending data has been sent. onWriteCompleted callback will not trigger any additional
187     // write call.
188     verify(cronetStream, times(5)).write(isA(ByteBuffer.class), eq(false));
189 
190     // Send end of stream. write will be immediately called since stream is ready.
191     clientStream.abstractClientStreamSink().writeFrame(null, true, true, 1);
192     verify(cronetStream, times(1)).write(isA(ByteBuffer.class), eq(true));
193     verify(cronetStream, times(3)).flush();
194   }
195 
responseHeader(String status)196   private static List<Map.Entry<String, String>> responseHeader(String status) {
197     Map<String, String> headers = new HashMap<String, String>();
198     headers.put(":status", status);
199     headers.put("content-type", "application/grpc");
200     headers.put("test-key", "test-value");
201     List<Map.Entry<String, String>> headerList = new ArrayList<Map.Entry<String, String>>(3);
202     for (Map.Entry<String, String> entry : headers.entrySet()) {
203       headerList.add(entry);
204     }
205     return headerList;
206   }
207 
trailers(int status)208   private static List<Map.Entry<String, String>> trailers(int status) {
209     Map<String, String> trailers = new HashMap<String, String>();
210     trailers.put("grpc-status", String.valueOf(status));
211     trailers.put("content-type", "application/grpc");
212     trailers.put("test-trailer-key", "test-trailer-value");
213     List<Map.Entry<String, String>> trailerList = new ArrayList<Map.Entry<String, String>>(3);
214     for (Map.Entry<String, String> entry : trailers.entrySet()) {
215       trailerList.add(entry);
216     }
217     return trailerList;
218   }
219 
createMessageFrame(byte[] bytes)220   private static ByteBuffer createMessageFrame(byte[] bytes) {
221     ByteBuffer buffer = ByteBuffer.allocate(1 + 4 + bytes.length);
222     buffer.put((byte) 0 /* UNCOMPRESSED */);
223     buffer.putInt(bytes.length);
224     buffer.put(bytes);
225     return buffer;
226   }
227 
228   @Test
read()229   public void read() {
230     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
231         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
232     verify(factory)
233         .newBidirectionalStreamBuilder(
234             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
235     BidirectionalStream.Callback callback = callbackCaptor.getValue();
236 
237     // Read is not called until we receive the response header.
238     verify(cronetStream, times(0)).read(isA(ByteBuffer.class));
239     UrlResponseInfo info =
240         new UrlResponseInfoImpl(
241             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
242     callback.onResponseHeadersReceived(cronetStream, info);
243     verify(cronetStream, times(1)).read(isA(ByteBuffer.class));
244     ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
245     verify(clientListener).headersRead(metadataCaptor.capture());
246     // Verify recevied headers.
247     Metadata metadata = metadataCaptor.getValue();
248     assertEquals(
249         "application/grpc",
250         metadata.get(Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER)));
251     assertEquals(
252         "test-value", metadata.get(Metadata.Key.of("test-key", Metadata.ASCII_STRING_MARSHALLER)));
253 
254     callback.onReadCompleted(
255         cronetStream,
256         info,
257         (ByteBuffer) createMessageFrame(new String("response1").getBytes(Charset.forName("UTF-8"))),
258         false);
259     // Haven't request any message, so no callback is called here.
260     verify(clientListener, times(0)).messagesAvailable(isA(MessageProducer.class));
261     verify(cronetStream, times(1)).read(isA(ByteBuffer.class));
262     // Request one message
263     clientStream.request(1);
264     verify(clientListener, times(1)).messagesAvailable(isA(MessageProducer.class));
265     verify(cronetStream, times(2)).read(isA(ByteBuffer.class));
266 
267     // BidirectionalStream.read will not be called again after receiving endOfStream(empty buffer).
268     clientStream.request(1);
269     callback.onReadCompleted(cronetStream, info, ByteBuffer.allocate(0), true);
270     verify(clientListener, times(1)).messagesAvailable(isA(MessageProducer.class));
271     verify(cronetStream, times(2)).read(isA(ByteBuffer.class));
272   }
273 
274   @Test
streamSucceeded()275   public void streamSucceeded() {
276     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
277         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
278     verify(factory)
279         .newBidirectionalStreamBuilder(
280             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
281     BidirectionalStream.Callback callback = callbackCaptor.getValue();
282 
283     callback.onStreamReady(cronetStream);
284     verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class));
285     // Send the first data frame.
286     CronetWritableBufferAllocator allocator = new CronetWritableBufferAllocator();
287     String request = new String("request");
288     WritableBuffer writableBuffer = allocator.allocate(request.length());
289     writableBuffer.write(request.getBytes(Charset.forName("UTF-8")), 0, request.length());
290     clientStream.abstractClientStreamSink().writeFrame(writableBuffer, false, true, 1);
291     ArgumentCaptor<ByteBuffer> bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
292     verify(cronetStream, times(1)).write(bufferCaptor.capture(), isA(Boolean.class));
293     ByteBuffer buffer = bufferCaptor.getValue();
294     buffer.position(request.length());
295     verify(cronetStream, times(1)).flush();
296 
297     // Receive response header
298     clientStream.request(2);
299     UrlResponseInfo info =
300         new UrlResponseInfoImpl(
301             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
302     callback.onResponseHeadersReceived(cronetStream, info);
303     verify(cronetStream, times(1)).read(isA(ByteBuffer.class));
304     // Receive one message
305     callback.onReadCompleted(
306         cronetStream,
307         info,
308         (ByteBuffer) createMessageFrame(new String("response").getBytes(Charset.forName("UTF-8"))),
309         false);
310     verify(clientListener, times(1)).messagesAvailable(isA(MessageProducer.class));
311     verify(cronetStream, times(2)).read(isA(ByteBuffer.class));
312 
313     // Send endOfStream
314     callback.onWriteCompleted(cronetStream, null, buffer, false);
315     clientStream.abstractClientStreamSink().writeFrame(null, true, true, 1);
316     verify(cronetStream, times(2)).write(isA(ByteBuffer.class), isA(Boolean.class));
317     verify(cronetStream, times(2)).flush();
318 
319     // Receive trailer
320     ((CronetClientStream.BidirectionalStreamCallback) callback).processTrailers(trailers(0));
321     callback.onSucceeded(cronetStream, info);
322 
323     // Verify trailer
324     ArgumentCaptor<Metadata> trailerCaptor = ArgumentCaptor.forClass(Metadata.class);
325     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
326     verify(clientListener)
327         .closed(statusCaptor.capture(), isA(RpcProgress.class), trailerCaptor.capture());
328     // Verify recevied headers.
329     Metadata trailers = trailerCaptor.getValue();
330     Status status = statusCaptor.getValue();
331     assertEquals(
332         "test-trailer-value",
333         trailers.get(Metadata.Key.of("test-trailer-key", Metadata.ASCII_STRING_MARSHALLER)));
334     assertEquals(
335         "application/grpc",
336         trailers.get(Metadata.Key.of("content-type", Metadata.ASCII_STRING_MARSHALLER)));
337     assertTrue(status.isOk());
338   }
339 
340   @Test
streamSucceededWithGrpcError()341   public void streamSucceededWithGrpcError() {
342     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
343         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
344     verify(factory)
345         .newBidirectionalStreamBuilder(
346             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
347     BidirectionalStream.Callback callback = callbackCaptor.getValue();
348 
349     callback.onStreamReady(cronetStream);
350     verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class));
351     clientStream.abstractClientStreamSink().writeFrame(null, true, true, 1);
352     verify(cronetStream, times(1)).write(isA(ByteBuffer.class), isA(Boolean.class));
353     verify(cronetStream, times(1)).flush();
354 
355     // Receive response header
356     clientStream.request(2);
357     UrlResponseInfo info =
358         new UrlResponseInfoImpl(
359             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
360     callback.onResponseHeadersReceived(cronetStream, info);
361     verify(cronetStream, times(1)).read(isA(ByteBuffer.class));
362 
363     // Receive trailer
364     callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true);
365     ((CronetClientStream.BidirectionalStreamCallback) callback)
366         .processTrailers(trailers(Status.PERMISSION_DENIED.getCode().value()));
367     callback.onSucceeded(cronetStream, info);
368 
369     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
370     verify(clientListener)
371         .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
372     // Verify error status.
373     Status status = statusCaptor.getValue();
374     assertFalse(status.isOk());
375     assertEquals(Status.PERMISSION_DENIED.getCode(), status.getCode());
376   }
377 
378   @Test
streamFailed()379   public void streamFailed() {
380     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
381         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
382     verify(factory)
383         .newBidirectionalStreamBuilder(
384             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
385     BidirectionalStream.Callback callback = callbackCaptor.getValue();
386 
387     // Nothing happens and stream fails
388 
389     CronetException exception = mock(CronetException.class);
390     callback.onFailed(cronetStream, null, exception);
391     verify(transport).finishStream(eq(clientStream), isA(Status.class));
392     // finishStream calls transportReportStatus.
393     clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata());
394 
395     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
396     verify(clientListener)
397         .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
398     Status status = statusCaptor.getValue();
399     assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
400   }
401 
402   @Test
streamFailedAfterResponseHeaderReceived()403   public void streamFailedAfterResponseHeaderReceived() {
404     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
405         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
406     verify(factory)
407         .newBidirectionalStreamBuilder(
408             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
409     BidirectionalStream.Callback callback = callbackCaptor.getValue();
410 
411     // Receive response header
412     UrlResponseInfo info =
413         new UrlResponseInfoImpl(
414             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
415     callback.onResponseHeadersReceived(cronetStream, info);
416 
417     CronetException exception = mock(CronetException.class);
418     callback.onFailed(cronetStream, info, exception);
419     verify(transport).finishStream(eq(clientStream), isA(Status.class));
420     // finishStream calls transportReportStatus.
421     clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata());
422 
423     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
424     verify(clientListener)
425         .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
426     Status status = statusCaptor.getValue();
427     assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
428   }
429 
430   @Test
streamFailedAfterTrailerReceived()431   public void streamFailedAfterTrailerReceived() {
432     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
433         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
434     verify(factory)
435         .newBidirectionalStreamBuilder(
436             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
437     BidirectionalStream.Callback callback = callbackCaptor.getValue();
438 
439     // Receive response header
440     UrlResponseInfo info =
441         new UrlResponseInfoImpl(
442             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
443     callback.onResponseHeadersReceived(cronetStream, info);
444 
445     // Report trailer but not endOfStream.
446     ((CronetClientStream.BidirectionalStreamCallback) callback).processTrailers(trailers(0));
447 
448     CronetException exception = mock(CronetException.class);
449     callback.onFailed(cronetStream, info, exception);
450     verify(transport).finishStream(eq(clientStream), isA(Status.class));
451     // finishStream calls transportReportStatus.
452     clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata());
453 
454     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
455     verify(clientListener)
456         .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
457     Status status = statusCaptor.getValue();
458     // Stream has already finished so OK status should be reported.
459     assertEquals(Status.UNAVAILABLE.getCode(), status.getCode());
460   }
461 
462   @Test
streamFailedAfterTrailerAndEndOfStreamReceived()463   public void streamFailedAfterTrailerAndEndOfStreamReceived() {
464     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
465         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
466     verify(factory)
467         .newBidirectionalStreamBuilder(
468             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
469     BidirectionalStream.Callback callback = callbackCaptor.getValue();
470 
471     // Receive response header
472     UrlResponseInfo info =
473         new UrlResponseInfoImpl(
474             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
475     callback.onResponseHeadersReceived(cronetStream, info);
476 
477     // Report trailer and endOfStream
478     callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true);
479     ((CronetClientStream.BidirectionalStreamCallback) callback).processTrailers(trailers(0));
480 
481     CronetException exception = mock(CronetException.class);
482     callback.onFailed(cronetStream, info, exception);
483     verify(transport).finishStream(eq(clientStream), isA(Status.class));
484     // finishStream calls transportReportStatus.
485     clientStream.transportState().transportReportStatus(Status.UNAVAILABLE, false, new Metadata());
486 
487     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
488     verify(clientListener)
489         .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
490     Status status = statusCaptor.getValue();
491     // Stream has already finished so OK status should be reported.
492     assertEquals(Status.OK.getCode(), status.getCode());
493   }
494 
495   @Test
cancelStream()496   public void cancelStream() {
497     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
498         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
499     verify(factory)
500         .newBidirectionalStreamBuilder(
501             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
502     BidirectionalStream.Callback callback = callbackCaptor.getValue();
503 
504     // Cancel the stream
505     clientStream.cancel(Status.DEADLINE_EXCEEDED);
506     verify(transport, times(0)).finishStream(eq(clientStream), isA(Status.class));
507 
508     callback.onCanceled(cronetStream, null);
509     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
510     verify(transport, times(1)).finishStream(eq(clientStream), statusCaptor.capture());
511     Status status = statusCaptor.getValue();
512     assertEquals(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
513   }
514 
515   @Test
reportTrailersWhenTrailersReceivedBeforeReadClosed()516   public void reportTrailersWhenTrailersReceivedBeforeReadClosed() {
517     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
518         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
519     verify(factory)
520         .newBidirectionalStreamBuilder(
521             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
522     BidirectionalStream.Callback callback = callbackCaptor.getValue();
523 
524     callback.onStreamReady(cronetStream);
525     UrlResponseInfo info =
526         new UrlResponseInfoImpl(
527             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
528     callback.onResponseHeadersReceived(cronetStream, info);
529     // Receive trailer first
530     ((CronetClientStream.BidirectionalStreamCallback) callback)
531         .processTrailers(trailers(Status.UNAUTHENTICATED.getCode().value()));
532     verify(clientListener, times(0))
533         .closed(isA(Status.class), isA(RpcProgress.class), isA(Metadata.class));
534 
535     // Receive cronet's endOfStream
536     callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true);
537     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
538     verify(clientListener, times(1))
539         .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
540     Status status = statusCaptor.getValue();
541     assertEquals(Status.UNAUTHENTICATED.getCode(), status.getCode());
542   }
543 
544   @Test
reportTrailersWhenTrailersReceivedAfterReadClosed()545   public void reportTrailersWhenTrailersReceivedAfterReadClosed() {
546     ArgumentCaptor<BidirectionalStream.Callback> callbackCaptor =
547         ArgumentCaptor.forClass(BidirectionalStream.Callback.class);
548     verify(factory)
549         .newBidirectionalStreamBuilder(
550             isA(String.class), callbackCaptor.capture(), isA(Executor.class));
551     BidirectionalStream.Callback callback = callbackCaptor.getValue();
552 
553     callback.onStreamReady(cronetStream);
554     UrlResponseInfo info =
555         new UrlResponseInfoImpl(
556             new ArrayList<String>(), 200, "", responseHeader("200"), false, "", "");
557     callback.onResponseHeadersReceived(cronetStream, info);
558     // Receive cronet's endOfStream
559     callback.onReadCompleted(cronetStream, null, ByteBuffer.allocate(0), true);
560     verify(clientListener, times(0))
561         .closed(isA(Status.class), isA(RpcProgress.class), isA(Metadata.class));
562 
563     // Receive trailer
564     ((CronetClientStream.BidirectionalStreamCallback) callback)
565         .processTrailers(trailers(Status.UNAUTHENTICATED.getCode().value()));
566     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
567     verify(clientListener, times(1))
568         .closed(statusCaptor.capture(), isA(RpcProgress.class), isA(Metadata.class));
569     Status status = statusCaptor.getValue();
570     assertEquals(Status.UNAUTHENTICATED.getCode(), status.getCode());
571   }
572 
573   @Test
addCronetRequestAnnotation_deprecated()574   public void addCronetRequestAnnotation_deprecated() {
575     Object annotation = new Object();
576     SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory);
577     CronetClientStream stream =
578         new CronetClientStream(
579             "https://www.google.com:443",
580             "cronet",
581             executor,
582             metadata,
583             transport,
584             callback,
585             lock,
586             100,
587             false /* alwaysUsePut */,
588             method,
589             StatsTraceContext.NOOP,
590             CallOptions.DEFAULT.withOption(CronetCallOptions.CRONET_ANNOTATION_KEY, annotation),
591             transportTracer);
592     callback.setStream(stream);
593     when(factory.newBidirectionalStreamBuilder(
594             any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
595         .thenReturn(builder);
596     stream.start(clientListener);
597 
598     // addRequestAnnotation should be called since we add the option CRONET_ANNOTATION_KEY above.
599     verify(builder).addRequestAnnotation(annotation);
600   }
601 
602   @Test
withAnnotation()603   public void withAnnotation() {
604     Object annotation1 = new Object();
605     Object annotation2 = new Object();
606     CallOptions callOptions = CronetCallOptions.withAnnotation(CallOptions.DEFAULT, annotation1);
607     callOptions = CronetCallOptions.withAnnotation(callOptions, annotation2);
608 
609     SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory);
610     CronetClientStream stream =
611         new CronetClientStream(
612             "https://www.google.com:443",
613             "cronet",
614             executor,
615             metadata,
616             transport,
617             callback,
618             lock,
619             100,
620             false /* alwaysUsePut */,
621             method,
622             StatsTraceContext.NOOP,
623             callOptions,
624             transportTracer);
625     callback.setStream(stream);
626     when(factory.newBidirectionalStreamBuilder(
627             any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
628         .thenReturn(builder);
629     stream.start(clientListener);
630 
631     verify(builder).addRequestAnnotation(annotation1);
632     verify(builder).addRequestAnnotation(annotation2);
633   }
634 
635   @Test
getUnaryRequest()636   public void getUnaryRequest() {
637     StreamBuilderFactory getFactory = mock(StreamBuilderFactory.class);
638     MethodDescriptor<?, ?> getMethod =
639         MethodDescriptor.<Void, Void>newBuilder()
640             .setType(MethodDescriptor.MethodType.UNARY)
641             .setFullMethodName("/service/method")
642             .setIdempotent(true)
643             .setSafe(true)
644             .setRequestMarshaller(marshaller)
645             .setResponseMarshaller(marshaller)
646             .build();
647     SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(getFactory);
648     CronetClientStream stream =
649         new CronetClientStream(
650             "https://www.google.com/service/method",
651             "cronet",
652             executor,
653             metadata,
654             transport,
655             callback,
656             lock,
657             100,
658             false /* alwaysUsePut */,
659             getMethod,
660             StatsTraceContext.NOOP,
661             CallOptions.DEFAULT,
662             transportTracer);
663     callback.setStream(stream);
664     ExperimentalBidirectionalStream.Builder getBuilder =
665         mock(ExperimentalBidirectionalStream.Builder.class);
666     when(getFactory.newBidirectionalStreamBuilder(
667             any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
668         .thenReturn(getBuilder);
669     when(getBuilder.build()).thenReturn(cronetStream);
670     stream.start(clientListener);
671 
672     // We will not create BidirectionalStream until we have the full request.
673     verify(getFactory, times(0))
674         .newBidirectionalStreamBuilder(
675             isA(String.class), isA(BidirectionalStream.Callback.class), isA(Executor.class));
676 
677     byte[] msg = "request".getBytes(Charset.forName("UTF-8"));
678     stream.writeMessage(new ByteArrayInputStream(msg));
679     // We still haven't built the stream or sent anything.
680     verify(cronetStream, times(0)).write(isA(ByteBuffer.class), isA(Boolean.class));
681     verify(getFactory, times(0))
682         .newBidirectionalStreamBuilder(
683             isA(String.class), isA(BidirectionalStream.Callback.class), isA(Executor.class));
684 
685     // halfClose will trigger sending.
686     stream.halfClose();
687 
688     // Stream should be built with request payload in the header.
689     ArgumentCaptor<String> urlCaptor = ArgumentCaptor.forClass(String.class);
690     verify(getFactory)
691         .newBidirectionalStreamBuilder(
692             urlCaptor.capture(), isA(BidirectionalStream.Callback.class), isA(Executor.class));
693     verify(getBuilder).setHttpMethod("GET");
694     assertEquals(
695         "https://www.google.com/service/method?" + BaseEncoding.base64().encode(msg),
696         urlCaptor.getValue());
697   }
698 
699   @Test
idempotentMethod_usesHttpPut()700   public void idempotentMethod_usesHttpPut() {
701     SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory);
702     MethodDescriptor<?, ?> idempotentMethod = method.toBuilder().setIdempotent(true).build();
703     CronetClientStream stream =
704         new CronetClientStream(
705             "https://www.google.com:443",
706             "cronet",
707             executor,
708             metadata,
709             transport,
710             callback,
711             lock,
712             100,
713             false /* alwaysUsePut */,
714             idempotentMethod,
715             StatsTraceContext.NOOP,
716             CallOptions.DEFAULT,
717             transportTracer);
718     callback.setStream(stream);
719     ExperimentalBidirectionalStream.Builder builder =
720         mock(ExperimentalBidirectionalStream.Builder.class);
721     when(factory.newBidirectionalStreamBuilder(
722             any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
723         .thenReturn(builder);
724     when(builder.build()).thenReturn(cronetStream);
725     stream.start(clientListener);
726 
727     verify(builder).setHttpMethod("PUT");
728   }
729 
730   @Test
alwaysUsePutOption_usesHttpPut()731   public void alwaysUsePutOption_usesHttpPut() {
732     SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory);
733     CronetClientStream stream =
734         new CronetClientStream(
735             "https://www.google.com:443",
736             "cronet",
737             executor,
738             metadata,
739             transport,
740             callback,
741             lock,
742             100,
743             true /* alwaysUsePut */,
744             method,
745             StatsTraceContext.NOOP,
746             CallOptions.DEFAULT,
747             transportTracer);
748     callback.setStream(stream);
749     ExperimentalBidirectionalStream.Builder builder =
750         mock(ExperimentalBidirectionalStream.Builder.class);
751     when(factory.newBidirectionalStreamBuilder(
752             any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
753         .thenReturn(builder);
754     when(builder.build()).thenReturn(cronetStream);
755     stream.start(clientListener);
756 
757     verify(builder).setHttpMethod("PUT");
758   }
759 
760   @Test
reservedHeadersStripped()761   public void reservedHeadersStripped() {
762     String userAgent = "cronet";
763     Metadata headers = new Metadata();
764     Metadata.Key<String> userKey = Metadata.Key.of("user-key", Metadata.ASCII_STRING_MARSHALLER);
765     headers.put(GrpcUtil.CONTENT_TYPE_KEY, "to-be-removed");
766     headers.put(GrpcUtil.USER_AGENT_KEY, "to-be-removed");
767     headers.put(GrpcUtil.TE_HEADER, "to-be-removed");
768     headers.put(userKey, "user-value");
769 
770     SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory);
771     CronetClientStream stream =
772         new CronetClientStream(
773             "https://www.google.com:443",
774             userAgent,
775             executor,
776             headers,
777             transport,
778             callback,
779             lock,
780             100,
781             false /* alwaysUsePut */,
782             method,
783             StatsTraceContext.NOOP,
784             CallOptions.DEFAULT,
785             transportTracer);
786     callback.setStream(stream);
787     ExperimentalBidirectionalStream.Builder builder =
788         mock(ExperimentalBidirectionalStream.Builder.class);
789     when(factory.newBidirectionalStreamBuilder(
790             any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
791         .thenReturn(builder);
792     when(builder.build()).thenReturn(cronetStream);
793     stream.start(clientListener);
794 
795     verify(builder, times(4)).addHeader(any(String.class), any(String.class));
796     verify(builder).addHeader(GrpcUtil.USER_AGENT_KEY.name(), userAgent);
797     verify(builder).addHeader(GrpcUtil.CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
798     verify(builder).addHeader("te", GrpcUtil.TE_TRAILERS);
799     verify(builder).addHeader(userKey.name(), "user-value");
800   }
801 }
802