• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.cronet;
18 
19 import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
20 import static io.grpc.internal.GrpcUtil.TE_HEADER;
21 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
22 
23 // TODO(ericgribkoff): Consider changing from android.util.Log to java logging.
24 import android.util.Log;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Preconditions;
27 import com.google.common.io.BaseEncoding;
28 import io.grpc.Attributes;
29 import io.grpc.CallOptions;
30 import io.grpc.InternalMetadata;
31 import io.grpc.Metadata;
32 import io.grpc.MethodDescriptor;
33 import io.grpc.Status;
34 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
35 import io.grpc.internal.AbstractClientStream;
36 import io.grpc.internal.GrpcUtil;
37 import io.grpc.internal.Http2ClientStreamTransportState;
38 import io.grpc.internal.ReadableBuffers;
39 import io.grpc.internal.StatsTraceContext;
40 import io.grpc.internal.TransportFrameUtil;
41 import io.grpc.internal.TransportTracer;
42 import io.grpc.internal.WritableBuffer;
43 import java.nio.ByteBuffer;
44 import java.nio.charset.Charset;
45 import java.util.ArrayList;
46 import java.util.Collection;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.Queue;
51 import java.util.concurrent.Executor;
52 import javax.annotation.Nullable;
53 import javax.annotation.concurrent.GuardedBy;
54 import org.chromium.net.BidirectionalStream;
55 import org.chromium.net.CronetException;
56 import org.chromium.net.ExperimentalBidirectionalStream;
57 import org.chromium.net.UrlResponseInfo;
58 
59 /**
60  * Client stream for the cronet transport.
61  */
62 class CronetClientStream extends AbstractClientStream {
63   private static final int READ_BUFFER_CAPACITY = 4 * 1024;
64   private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
65   private static final String LOG_TAG = "grpc-java-cronet";
66   private final String url;
67   private final String userAgent;
68   private final StatsTraceContext statsTraceCtx;
69   private final Executor executor;
70   private final Metadata headers;
71   private final CronetClientTransport transport;
72   private final Runnable startCallback;
73   @VisibleForTesting
74   final boolean idempotent;
75   private BidirectionalStream stream;
76   private final boolean delayRequestHeader;
77   private final Object annotation;
78   private final Collection<Object> annotations;
79   private final TransportState state;
80   private final Sink sink = new Sink();
81   private StreamBuilderFactory streamFactory;
82 
CronetClientStream( final String url, @Nullable String userAgent, Executor executor, final Metadata headers, CronetClientTransport transport, Runnable startCallback, Object lock, int maxMessageSize, boolean alwaysUsePut, MethodDescriptor<?, ?> method, StatsTraceContext statsTraceCtx, CallOptions callOptions, TransportTracer transportTracer)83   CronetClientStream(
84       final String url,
85       @Nullable String userAgent,
86       Executor executor,
87       final Metadata headers,
88       CronetClientTransport transport,
89       Runnable startCallback,
90       Object lock,
91       int maxMessageSize,
92       boolean alwaysUsePut,
93       MethodDescriptor<?, ?> method,
94       StatsTraceContext statsTraceCtx,
95       CallOptions callOptions,
96       TransportTracer transportTracer) {
97     super(
98         new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers,
99         method.isSafe());
100     this.url = Preconditions.checkNotNull(url, "url");
101     this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent");
102     this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
103     this.executor = Preconditions.checkNotNull(executor, "executor");
104     this.headers = Preconditions.checkNotNull(headers, "headers");
105     this.transport = Preconditions.checkNotNull(transport, "transport");
106     this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback");
107     this.idempotent = method.isIdempotent() || alwaysUsePut;
108     // Only delay flushing header for unary rpcs.
109     this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY);
110     this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY);
111     this.annotations = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATIONS_KEY);
112     this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer);
113   }
114 
115   @Override
transportState()116   protected TransportState transportState() {
117     return state;
118   }
119 
120   @Override
abstractClientStreamSink()121   protected Sink abstractClientStreamSink() {
122     return sink;
123   }
124 
125   @Override
setAuthority(String authority)126   public void setAuthority(String authority) {
127     throw new UnsupportedOperationException("Cronet does not support overriding authority");
128   }
129 
130   class Sink implements AbstractClientStream.Sink {
131     @Override
writeHeaders(Metadata metadata, byte[] payload)132     public void writeHeaders(Metadata metadata, byte[] payload) {
133       startCallback.run();
134 
135       BidirectionalStreamCallback callback = new BidirectionalStreamCallback();
136       String path = url;
137       if (payload != null) {
138         path += "?" + BaseEncoding.base64().encode(payload);
139       }
140       BidirectionalStream.Builder builder =
141           streamFactory.newBidirectionalStreamBuilder(path, callback, executor);
142       if (payload != null) {
143         builder.setHttpMethod("GET");
144       } else if (idempotent) {
145         builder.setHttpMethod("PUT");
146       }
147       if (delayRequestHeader) {
148         builder.delayRequestHeadersUntilFirstFlush(true);
149       }
150       if (annotation != null) {
151         ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(annotation);
152       }
153       if (annotations != null) {
154         for (Object o : annotations) {
155           ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(o);
156         }
157       }
158       setGrpcHeaders(builder);
159       stream = builder.build();
160       stream.start();
161     }
162 
163     @Override
writeFrame( WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages)164     public void writeFrame(
165         WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages) {
166       synchronized (state.lock) {
167         if (state.cancelSent) {
168           return;
169         }
170         ByteBuffer byteBuffer;
171         if (buffer != null) {
172           byteBuffer = ((CronetWritableBuffer) buffer).buffer();
173           byteBuffer.flip();
174         } else {
175           byteBuffer = EMPTY_BUFFER;
176         }
177         onSendingBytes(byteBuffer.remaining());
178         if (!state.streamReady) {
179           state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush));
180         } else {
181           streamWrite(byteBuffer, endOfStream, flush);
182         }
183       }
184     }
185 
186     @Override
request(final int numMessages)187     public void request(final int numMessages) {
188       synchronized (state.lock) {
189         state.requestMessagesFromDeframer(numMessages);
190       }
191     }
192 
193     @Override
cancel(Status reason)194     public void cancel(Status reason) {
195       synchronized (state.lock) {
196         if (state.cancelSent) {
197           return;
198         }
199         state.cancelSent = true;
200         state.cancelReason = reason;
201         state.clearPendingData();
202         if (stream != null) {
203           // Will report stream finish when BidirectionalStreamCallback.onCanceled is called.
204           stream.cancel();
205         } else {
206           transport.finishStream(CronetClientStream.this, reason);
207         }
208       }
209     }
210   }
211 
212   class TransportState extends Http2ClientStreamTransportState {
213     private final Object lock;
214     @GuardedBy("lock")
215     private Queue<PendingData> pendingData = new LinkedList<PendingData>();
216     @GuardedBy("lock")
217     private boolean streamReady;
218     @GuardedBy("lock")
219     private boolean cancelSent = false;
220     @GuardedBy("lock")
221     private int bytesPendingProcess;
222     @GuardedBy("lock")
223     private Status cancelReason;
224     @GuardedBy("lock")
225     private boolean readClosed;
226     @GuardedBy("lock")
227     private boolean firstWriteComplete;
228 
TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, TransportTracer transportTracer)229     public TransportState(
230         int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock,
231         TransportTracer transportTracer) {
232       super(maxMessageSize, statsTraceCtx, transportTracer);
233       this.lock = Preconditions.checkNotNull(lock, "lock");
234     }
235 
236     @GuardedBy("lock")
start(StreamBuilderFactory factory)237     public void start(StreamBuilderFactory factory) {
238       streamFactory = factory;
239     }
240 
241     @GuardedBy("lock")
242     @Override
onStreamAllocated()243     protected void onStreamAllocated() {
244       super.onStreamAllocated();
245     }
246 
247     @GuardedBy("lock")
248     @Override
http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)249     protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
250       stream.cancel();
251       transportReportStatus(status, stopDelivery, trailers);
252     }
253 
254     @GuardedBy("lock")
255     @Override
deframeFailed(Throwable cause)256     public void deframeFailed(Throwable cause) {
257       http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
258     }
259 
260     @Override
runOnTransportThread(final Runnable r)261     public void runOnTransportThread(final Runnable r) {
262       synchronized (lock) {
263         r.run();
264       }
265     }
266 
267     @GuardedBy("lock")
268     @Override
bytesRead(int processedBytes)269     public void bytesRead(int processedBytes) {
270       bytesPendingProcess -= processedBytes;
271       if (bytesPendingProcess == 0 && !readClosed) {
272         if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
273           Log.v(LOG_TAG, "BidirectionalStream.read");
274         }
275         stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
276       }
277     }
278 
279     @GuardedBy("lock")
transportHeadersReceived(Metadata metadata, boolean endOfStream)280     private void transportHeadersReceived(Metadata metadata, boolean endOfStream) {
281       if (endOfStream) {
282         transportTrailersReceived(metadata);
283       } else {
284         transportHeadersReceived(metadata);
285       }
286     }
287 
288     @GuardedBy("lock")
transportDataReceived(ByteBuffer buffer, boolean endOfStream)289     private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) {
290       bytesPendingProcess += buffer.remaining();
291       super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream);
292     }
293 
294     @GuardedBy("lock")
clearPendingData()295     private void clearPendingData() {
296       for (PendingData data : pendingData) {
297         data.buffer.clear();
298       }
299       pendingData.clear();
300     }
301 
302     @GuardedBy("lock")
enqueuePendingData(PendingData data)303     private void enqueuePendingData(PendingData data) {
304       pendingData.add(data);
305     }
306 
307     @GuardedBy("lock")
writeAllPendingData()308     private void writeAllPendingData() {
309       for (PendingData data : pendingData) {
310         streamWrite(data.buffer, data.endOfStream, data.flush);
311       }
312       pendingData.clear();
313     }
314   }
315 
316   // TODO(ericgribkoff): move header related method to a common place like GrpcUtil.
isApplicationHeader(String key)317   private static boolean isApplicationHeader(String key) {
318     // Don't allow reserved non HTTP/2 pseudo headers to be added
319     // HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character.
320     return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key)
321         && !USER_AGENT_KEY.name().equalsIgnoreCase(key)
322         && !TE_HEADER.name().equalsIgnoreCase(key);
323   }
324 
setGrpcHeaders(BidirectionalStream.Builder builder)325   private void setGrpcHeaders(BidirectionalStream.Builder builder) {
326     // Psuedo-headers are set by cronet.
327     // All non-pseudo headers must come after pseudo headers.
328     // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed.
329     builder.addHeader(USER_AGENT_KEY.name(), userAgent);
330     builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
331     builder.addHeader("te", GrpcUtil.TE_TRAILERS);
332 
333     // Now add any application-provided headers.
334     // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between
335     // String and byte array.
336     byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
337     for (int i = 0; i < serializedHeaders.length; i += 2) {
338       String key = new String(serializedHeaders[i], Charset.forName("UTF-8"));
339       // TODO(ericgribkoff): log an error or throw an exception
340       if (isApplicationHeader(key)) {
341         String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8"));
342         builder.addHeader(key, value);
343       }
344     }
345   }
346 
streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush)347   private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) {
348     if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
349       Log.v(LOG_TAG, "BidirectionalStream.write");
350     }
351     stream.write(buffer, endOfStream);
352     if (flush) {
353       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
354         Log.v(LOG_TAG, "BidirectionalStream.flush");
355       }
356       stream.flush();
357     }
358   }
359 
finishStream(Status status)360   private void finishStream(Status status) {
361     transport.finishStream(this, status);
362   }
363 
364   @Override
getAttributes()365   public Attributes getAttributes() {
366     return Attributes.EMPTY;
367   }
368 
369   class BidirectionalStreamCallback extends BidirectionalStream.Callback {
370     private List<Map.Entry<String, String>> trailerList;
371 
372     @Override
onStreamReady(BidirectionalStream stream)373     public void onStreamReady(BidirectionalStream stream) {
374       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
375         Log.v(LOG_TAG, "onStreamReady");
376       }
377       synchronized (state.lock) {
378         // Now that the stream is ready, call the listener's onReady callback if
379         // appropriate.
380         state.onStreamAllocated();
381         state.streamReady = true;
382         state.writeAllPendingData();
383       }
384     }
385 
386     @Override
onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)387     public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
388       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
389         Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList());
390         Log.v(LOG_TAG, "BidirectionalStream.read");
391       }
392       reportHeaders(info.getAllHeadersAsList(), false);
393       stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
394     }
395 
396     @Override
onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)397     public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info,
398         ByteBuffer buffer, boolean endOfStream) {
399       buffer.flip();
400       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
401         Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining());
402       }
403 
404       synchronized (state.lock) {
405         state.readClosed = endOfStream;
406         // The endOfStream in gRPC has a different meaning so we always call transportDataReceived
407         // with endOfStream=false.
408         if (buffer.remaining() != 0) {
409           state.transportDataReceived(buffer, false);
410         }
411       }
412       if (endOfStream && trailerList != null) {
413         // Process trailers if we have already received any.
414         reportHeaders(trailerList, true);
415       }
416     }
417 
418     @Override
onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)419     public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info,
420         ByteBuffer buffer, boolean endOfStream) {
421       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
422         Log.v(LOG_TAG, "onWriteCompleted");
423       }
424       synchronized (state.lock) {
425         if (!state.firstWriteComplete) {
426           // Cronet API doesn't notify when headers are written to wire, but it occurs before first
427           // onWriteCompleted callback.
428           state.firstWriteComplete = true;
429           statsTraceCtx.clientOutboundHeaders();
430         }
431         state.onSentBytes(buffer.position());
432       }
433     }
434 
435     @Override
onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)436     public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info,
437         UrlResponseInfo.HeaderBlock trailers) {
438       processTrailers(trailers.getAsList());
439     }
440 
441     // We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be
442     // mocked.
443     @VisibleForTesting
processTrailers(List<Map.Entry<String, String>> trailerList)444     void processTrailers(List<Map.Entry<String, String>> trailerList) {
445       this.trailerList = trailerList;
446       boolean readClosed;
447       synchronized (state.lock) {
448         readClosed = state.readClosed;
449       }
450       if (readClosed) {
451         // There's no pending onReadCompleted callback so we can report trailers now.
452         reportHeaders(trailerList, true);
453       }
454       // Otherwise report trailers in onReadCompleted, or onSucceeded.
455       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
456         Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString());
457       }
458     }
459 
460     @Override
onSucceeded(BidirectionalStream stream, UrlResponseInfo info)461     public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
462       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
463         Log.v(LOG_TAG, "onSucceeded");
464       }
465 
466       if (!haveTrailersBeenReported()) {
467         if (trailerList != null) {
468           reportHeaders(trailerList, true);
469         } else if (info != null) {
470           reportHeaders(info.getAllHeadersAsList(), true);
471         } else {
472           throw new AssertionError("No response header or trailer");
473         }
474       }
475       finishStream(toGrpcStatus(info));
476     }
477 
478     @Override
onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)479     public void onFailed(BidirectionalStream stream, UrlResponseInfo info,
480         CronetException error) {
481       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
482         Log.v(LOG_TAG, "onFailed");
483       }
484       finishStream(Status.UNAVAILABLE.withCause(error));
485     }
486 
487     @Override
onCanceled(BidirectionalStream stream, UrlResponseInfo info)488     public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
489       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
490         Log.v(LOG_TAG, "onCanceled");
491       }
492       Status status;
493       synchronized (state.lock) {
494         if (state.cancelReason != null) {
495           status = state.cancelReason;
496         } else if (info != null) {
497           status = toGrpcStatus(info);
498         } else {
499           status = Status.CANCELLED.withDescription("stream cancelled without reason");
500         }
501       }
502       finishStream(status);
503     }
504 
reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream)505     private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) {
506       // TODO(ericgribkoff): create new utility methods to eliminate all these conversions
507       List<String> headerList = new ArrayList<>();
508       for (Map.Entry<String, String> entry : headers) {
509         headerList.add(entry.getKey());
510         headerList.add(entry.getValue());
511       }
512 
513       byte[][] headerValues = new byte[headerList.size()][];
514       for (int i = 0; i < headerList.size(); i += 2) {
515         headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8"));
516         headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8"));
517       }
518       Metadata metadata =
519           InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues));
520       synchronized (state.lock) {
521         // There's no pending onReadCompleted callback so we can report trailers now.
522         state.transportHeadersReceived(metadata, endOfStream);
523       }
524     }
525 
haveTrailersBeenReported()526     private boolean haveTrailersBeenReported() {
527       synchronized (state.lock) {
528         return trailerList != null && state.readClosed;
529       }
530     }
531 
toGrpcStatus(UrlResponseInfo info)532     private Status toGrpcStatus(UrlResponseInfo info) {
533       return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode());
534     }
535   }
536 
537   private static class PendingData {
538     ByteBuffer buffer;
539     boolean endOfStream;
540     boolean flush;
541 
PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush)542     PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) {
543       this.buffer = buffer;
544       this.endOfStream = endOfStream;
545       this.flush = flush;
546     }
547   }
548 }
549