• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.cronet;
18 
19 import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
20 import static io.grpc.internal.GrpcUtil.TE_HEADER;
21 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
22 
23 // TODO(ericgribkoff): Consider changing from android.util.Log to java logging.
24 import android.util.Log;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Preconditions;
27 import com.google.common.io.BaseEncoding;
28 import io.grpc.Attributes;
29 import io.grpc.CallOptions;
30 import io.grpc.InternalMetadata;
31 import io.grpc.Metadata;
32 import io.grpc.MethodDescriptor;
33 import io.grpc.Status;
34 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
35 import io.grpc.internal.AbstractClientStream;
36 import io.grpc.internal.GrpcUtil;
37 import io.grpc.internal.Http2ClientStreamTransportState;
38 import io.grpc.internal.ReadableBuffers;
39 import io.grpc.internal.StatsTraceContext;
40 import io.grpc.internal.TransportFrameUtil;
41 import io.grpc.internal.TransportTracer;
42 import io.grpc.internal.WritableBuffer;
43 import java.lang.reflect.InvocationTargetException;
44 import java.lang.reflect.Method;
45 import java.nio.Buffer;
46 import java.nio.ByteBuffer;
47 import java.nio.charset.Charset;
48 import java.util.ArrayList;
49 import java.util.Collection;
50 import java.util.Collections;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.concurrent.Executor;
54 import javax.annotation.Nullable;
55 import javax.annotation.concurrent.GuardedBy;
56 import org.chromium.net.BidirectionalStream;
57 import org.chromium.net.CronetException;
58 import org.chromium.net.ExperimentalBidirectionalStream;
59 import org.chromium.net.UrlResponseInfo;
60 
61 /**
62  * Client stream for the cronet transport.
63  */
64 class CronetClientStream extends AbstractClientStream {
65   private static final int READ_BUFFER_CAPACITY = 4 * 1024;
66   private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
67   private static final String LOG_TAG = "grpc-java-cronet";
68 
69   private static volatile boolean loadAddRequestAnnotationAttempted;
70   private static volatile Method addRequestAnnotationMethod;
71 
72   @Deprecated
73   static final CallOptions.Key<Object> CRONET_ANNOTATION_KEY =
74       CallOptions.Key.create("cronet-annotation");
75 
76   static final CallOptions.Key<Collection<Object>> CRONET_ANNOTATIONS_KEY =
77       CallOptions.Key.create("cronet-annotations");
78 
79   private final String url;
80   private final String userAgent;
81   private final StatsTraceContext statsTraceCtx;
82   private final Executor executor;
83   private final Metadata headers;
84   private final CronetClientTransport transport;
85   private final Runnable startCallback;
86   @VisibleForTesting
87   final boolean idempotent;
88   private BidirectionalStream stream;
89   private final boolean delayRequestHeader;
90   private final Object annotation;
91   private final Collection<Object> annotations;
92   private final TransportState state;
93   private final Sink sink = new Sink();
94   private StreamBuilderFactory streamFactory;
95 
CronetClientStream( final String url, @Nullable String userAgent, Executor executor, final Metadata headers, CronetClientTransport transport, Runnable startCallback, Object lock, int maxMessageSize, boolean alwaysUsePut, MethodDescriptor<?, ?> method, StatsTraceContext statsTraceCtx, CallOptions callOptions, TransportTracer transportTracer, boolean useGetForSafeMethods, boolean usePutForIdempotentMethods)96   CronetClientStream(
97       final String url,
98       @Nullable String userAgent,
99       Executor executor,
100       final Metadata headers,
101       CronetClientTransport transport,
102       Runnable startCallback,
103       Object lock,
104       int maxMessageSize,
105       boolean alwaysUsePut,
106       MethodDescriptor<?, ?> method,
107       StatsTraceContext statsTraceCtx,
108       CallOptions callOptions,
109       TransportTracer transportTracer,
110       boolean useGetForSafeMethods,
111       boolean usePutForIdempotentMethods) {
112     super(
113         new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions,
114         useGetForSafeMethods && method.isSafe());
115     this.url = Preconditions.checkNotNull(url, "url");
116     this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent");
117     this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
118     this.executor = Preconditions.checkNotNull(executor, "executor");
119     this.headers = Preconditions.checkNotNull(headers, "headers");
120     this.transport = Preconditions.checkNotNull(transport, "transport");
121     this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback");
122     this.idempotent = (usePutForIdempotentMethods && method.isIdempotent()) || alwaysUsePut;
123     // Only delay flushing header for unary rpcs.
124     this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY);
125     this.annotation = callOptions.getOption(CRONET_ANNOTATION_KEY);
126     this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
127     this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer);
128 
129     // Tests expect the "plain" deframer behavior, not MigratingDeframer
130     // https://github.com/grpc/grpc-java/issues/7140
131     optimizeForDirectExecutor();
132   }
133 
134   @Override
transportState()135   protected TransportState transportState() {
136     return state;
137   }
138 
139   @Override
abstractClientStreamSink()140   protected Sink abstractClientStreamSink() {
141     return sink;
142   }
143 
144   @Override
setAuthority(String authority)145   public void setAuthority(String authority) {
146     throw new UnsupportedOperationException("Cronet does not support overriding authority");
147   }
148 
149   /**
150    * Returns a copy of {@code callOptions} with {@code annotation} included as one of the Cronet
151    * annotation objects. When an RPC is made using a {@link CallOptions} instance returned by this
152    * method, the annotation objects will be attached to the underlying Cronet bidirectional stream.
153    * When the stream finishes, the user can retrieve the annotation objects via {@link
154    * org.chromium.net.RequestFinishedInfo.Listener}.
155    *
156    * @param annotation the object to attach to the Cronet stream
157    */
withAnnotation(CallOptions callOptions, Object annotation)158   static CallOptions withAnnotation(CallOptions callOptions, Object annotation) {
159     Collection<Object> existingAnnotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
160     ArrayList<Object> newAnnotations;
161     if (existingAnnotations == null) {
162       newAnnotations = new ArrayList<>();
163     } else {
164       newAnnotations = new ArrayList<>(existingAnnotations);
165     }
166     newAnnotations.add(annotation);
167     return callOptions.withOption(
168         CRONET_ANNOTATIONS_KEY, Collections.unmodifiableList(newAnnotations));
169   }
170 
171   class Sink implements AbstractClientStream.Sink {
172     @Override
writeHeaders(Metadata metadata, byte[] payload)173     public void writeHeaders(Metadata metadata, byte[] payload) {
174       startCallback.run();
175       // streamFactory will be set by startCallback, unless the transport is in go-away state
176       if (streamFactory == null) {
177         return;
178       }
179 
180       BidirectionalStreamCallback callback = new BidirectionalStreamCallback();
181       String path = url;
182       if (payload != null) {
183         path += "?" + BaseEncoding.base64().encode(payload);
184       }
185       BidirectionalStream.Builder builder =
186           streamFactory.newBidirectionalStreamBuilder(path, callback, executor);
187       if (payload != null) {
188         builder.setHttpMethod("GET");
189       } else if (idempotent) {
190         builder.setHttpMethod("PUT");
191       }
192       if (delayRequestHeader) {
193         builder.delayRequestHeadersUntilFirstFlush(true);
194       }
195       if (annotation != null || annotations != null) {
196         ExperimentalBidirectionalStream.Builder expBidiStreamBuilder =
197             (ExperimentalBidirectionalStream.Builder) builder;
198         if (annotation != null) {
199           addRequestAnnotation(expBidiStreamBuilder, annotation);
200         }
201         if (annotations != null) {
202           for (Object o : annotations) {
203             addRequestAnnotation(expBidiStreamBuilder, o);
204           }
205         }
206       }
207       setGrpcHeaders(builder);
208       stream = builder.build();
209       stream.start();
210     }
211 
212     @Override
writeFrame( WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages)213     public void writeFrame(
214         WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages) {
215       synchronized (state.lock) {
216         if (state.cancelSent) {
217           return;
218         }
219         ByteBuffer byteBuffer;
220         if (buffer != null) {
221           byteBuffer = ((CronetWritableBuffer) buffer).buffer();
222           ((Buffer) byteBuffer).flip();
223         } else {
224           byteBuffer = EMPTY_BUFFER;
225         }
226         onSendingBytes(byteBuffer.remaining());
227         if (!state.streamReady) {
228           state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush));
229         } else {
230           streamWrite(byteBuffer, endOfStream, flush);
231         }
232       }
233     }
234 
235     @Override
cancel(Status reason)236     public void cancel(Status reason) {
237       synchronized (state.lock) {
238         if (state.cancelSent) {
239           return;
240         }
241         state.cancelSent = true;
242         state.cancelReason = reason;
243         state.clearPendingData();
244         if (stream != null) {
245           // Will report stream finish when BidirectionalStreamCallback.onCanceled is called.
246           stream.cancel();
247         } else {
248           transport.finishStream(CronetClientStream.this, reason);
249         }
250       }
251     }
252   }
253 
254   class TransportState extends Http2ClientStreamTransportState {
255     private final Object lock;
256     @GuardedBy("lock")
257     private Collection<PendingData> pendingData = new ArrayList<PendingData>();
258     @GuardedBy("lock")
259     private boolean streamReady;
260     @GuardedBy("lock")
261     private boolean cancelSent = false;
262     @GuardedBy("lock")
263     private int bytesPendingProcess;
264     @GuardedBy("lock")
265     private Status cancelReason;
266     @GuardedBy("lock")
267     private boolean readClosed;
268     @GuardedBy("lock")
269     private boolean firstWriteComplete;
270 
TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, TransportTracer transportTracer)271     public TransportState(
272         int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock,
273         TransportTracer transportTracer) {
274       super(maxMessageSize, statsTraceCtx, transportTracer);
275       this.lock = Preconditions.checkNotNull(lock, "lock");
276     }
277 
278     @GuardedBy("lock")
start(StreamBuilderFactory factory)279     public void start(StreamBuilderFactory factory) {
280       streamFactory = factory;
281     }
282 
283     @GuardedBy("lock")
284     @Override
onStreamAllocated()285     protected void onStreamAllocated() {
286       super.onStreamAllocated();
287     }
288 
289     @GuardedBy("lock")
290     @Override
http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)291     protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
292       Preconditions.checkNotNull(stream, "stream must not be null");
293       stream.cancel();
294       transportReportStatus(status, stopDelivery, trailers);
295     }
296 
297     @GuardedBy("lock")
298     @Override
deframeFailed(Throwable cause)299     public void deframeFailed(Throwable cause) {
300       http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
301     }
302 
303     @Override
runOnTransportThread(final Runnable r)304     public void runOnTransportThread(final Runnable r) {
305       synchronized (lock) {
306         r.run();
307       }
308     }
309 
310     @GuardedBy("lock")
311     @Override
bytesRead(int processedBytes)312     public void bytesRead(int processedBytes) {
313       Preconditions.checkNotNull(stream, "stream must not be null");
314       bytesPendingProcess -= processedBytes;
315       if (bytesPendingProcess == 0 && !readClosed) {
316         if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
317           Log.v(LOG_TAG, "BidirectionalStream.read");
318         }
319         stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
320       }
321     }
322 
323     @GuardedBy("lock")
transportHeadersReceived(Metadata metadata, boolean endOfStream)324     private void transportHeadersReceived(Metadata metadata, boolean endOfStream) {
325       if (endOfStream) {
326         transportTrailersReceived(metadata);
327       } else {
328         transportHeadersReceived(metadata);
329       }
330     }
331 
332     @GuardedBy("lock")
transportDataReceived(ByteBuffer buffer, boolean endOfStream)333     private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) {
334       bytesPendingProcess += buffer.remaining();
335       super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream);
336     }
337 
338     @GuardedBy("lock")
clearPendingData()339     private void clearPendingData() {
340       for (PendingData data : pendingData) {
341         data.buffer.clear();
342       }
343       pendingData.clear();
344     }
345 
346     @GuardedBy("lock")
enqueuePendingData(PendingData data)347     private void enqueuePendingData(PendingData data) {
348       pendingData.add(data);
349     }
350 
351     @GuardedBy("lock")
writeAllPendingData()352     private void writeAllPendingData() {
353       for (PendingData data : pendingData) {
354         streamWrite(data.buffer, data.endOfStream, data.flush);
355       }
356       pendingData.clear();
357     }
358   }
359 
360   // TODO(ericgribkoff): move header related method to a common place like GrpcUtil.
isApplicationHeader(String key)361   private static boolean isApplicationHeader(String key) {
362     // Don't allow reserved non HTTP/2 pseudo headers to be added
363     // HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character.
364     return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key)
365         && !USER_AGENT_KEY.name().equalsIgnoreCase(key)
366         && !TE_HEADER.name().equalsIgnoreCase(key);
367   }
368 
addRequestAnnotation(ExperimentalBidirectionalStream.Builder builder, Object annotation)369   private static void addRequestAnnotation(ExperimentalBidirectionalStream.Builder builder,
370       Object annotation) {
371     if (!loadAddRequestAnnotationAttempted) {
372       synchronized (CronetClientStream.class) {
373         if (!loadAddRequestAnnotationAttempted) {
374           try {
375             addRequestAnnotationMethod = ExperimentalBidirectionalStream.Builder.class
376                 .getMethod("addRequestAnnotation", Object.class);
377           } catch (NoSuchMethodException e) {
378             Log.w(LOG_TAG,
379                 "Failed to load method ExperimentalBidirectionalStream.Builder.addRequestAnnotation",
380                 e);
381           } finally {
382             loadAddRequestAnnotationAttempted = true;
383           }
384         }
385       }
386     }
387     if (addRequestAnnotationMethod != null) {
388       try {
389         addRequestAnnotationMethod.invoke(builder, annotation);
390       } catch (InvocationTargetException e) {
391         throw new RuntimeException(e.getCause() == null ? e.getTargetException() : e.getCause());
392       } catch (IllegalAccessException e) {
393         Log.w(LOG_TAG, "Failed to add request annotation: " + annotation, e);
394       }
395     }
396   }
397 
setGrpcHeaders(BidirectionalStream.Builder builder)398   private void setGrpcHeaders(BidirectionalStream.Builder builder) {
399     // Psuedo-headers are set by cronet.
400     // All non-pseudo headers must come after pseudo headers.
401     // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed.
402     builder.addHeader(USER_AGENT_KEY.name(), userAgent);
403     builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
404     builder.addHeader("te", GrpcUtil.TE_TRAILERS);
405 
406     // Now add any application-provided headers.
407     // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between
408     // String and byte array.
409     byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
410     for (int i = 0; i < serializedHeaders.length; i += 2) {
411       String key = new String(serializedHeaders[i], Charset.forName("UTF-8"));
412       // TODO(ericgribkoff): log an error or throw an exception
413       if (isApplicationHeader(key)) {
414         String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8"));
415         builder.addHeader(key, value);
416       }
417     }
418   }
419 
streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush)420   private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) {
421     if (stream == null) {
422       return;
423     }
424     if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
425       Log.v(LOG_TAG, "BidirectionalStream.write");
426     }
427     stream.write(buffer, endOfStream);
428     if (flush) {
429       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
430         Log.v(LOG_TAG, "BidirectionalStream.flush");
431       }
432       stream.flush();
433     }
434   }
435 
finishStream(Status status)436   private void finishStream(Status status) {
437     transport.finishStream(this, status);
438   }
439 
440   @Override
getAttributes()441   public Attributes getAttributes() {
442     return Attributes.EMPTY;
443   }
444 
445   class BidirectionalStreamCallback extends BidirectionalStream.Callback {
446     private List<Map.Entry<String, String>> trailerList;
447 
448     @Override
onStreamReady(BidirectionalStream stream)449     public void onStreamReady(BidirectionalStream stream) {
450       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
451         Log.v(LOG_TAG, "onStreamReady");
452       }
453       synchronized (state.lock) {
454         // Now that the stream is ready, call the listener's onReady callback if
455         // appropriate.
456         state.onStreamAllocated();
457         state.streamReady = true;
458         state.writeAllPendingData();
459       }
460     }
461 
462     @Override
onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)463     public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
464       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
465         Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList());
466         Log.v(LOG_TAG, "BidirectionalStream.read");
467       }
468       reportHeaders(info.getAllHeadersAsList(), false);
469       stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
470     }
471 
472     @Override
onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)473     public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info,
474         ByteBuffer buffer, boolean endOfStream) {
475       ((Buffer) buffer).flip();
476       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
477         Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining());
478       }
479 
480       synchronized (state.lock) {
481         state.readClosed = endOfStream;
482         // The endOfStream in gRPC has a different meaning so we always call transportDataReceived
483         // with endOfStream=false.
484         if (buffer.remaining() != 0) {
485           state.transportDataReceived(buffer, false);
486         }
487       }
488       if (endOfStream && trailerList != null) {
489         // Process trailers if we have already received any.
490         reportHeaders(trailerList, true);
491       }
492     }
493 
494     @Override
onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)495     public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info,
496         ByteBuffer buffer, boolean endOfStream) {
497       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
498         Log.v(LOG_TAG, "onWriteCompleted");
499       }
500       synchronized (state.lock) {
501         if (!state.firstWriteComplete) {
502           // Cronet API doesn't notify when headers are written to wire, but it occurs before first
503           // onWriteCompleted callback.
504           state.firstWriteComplete = true;
505           statsTraceCtx.clientOutboundHeaders();
506         }
507         state.onSentBytes(buffer.position());
508       }
509     }
510 
511     @Override
onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)512     public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info,
513         UrlResponseInfo.HeaderBlock trailers) {
514       processTrailers(trailers.getAsList());
515     }
516 
517     // We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be
518     // mocked.
519     @VisibleForTesting
processTrailers(List<Map.Entry<String, String>> trailerList)520     void processTrailers(List<Map.Entry<String, String>> trailerList) {
521       this.trailerList = trailerList;
522       boolean readClosed;
523       synchronized (state.lock) {
524         readClosed = state.readClosed;
525       }
526       if (readClosed) {
527         // There's no pending onReadCompleted callback so we can report trailers now.
528         reportHeaders(trailerList, true);
529       }
530       // Otherwise report trailers in onReadCompleted, or onSucceeded.
531       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
532         Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString());
533       }
534     }
535 
536     @Override
onSucceeded(BidirectionalStream stream, UrlResponseInfo info)537     public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
538       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
539         Log.v(LOG_TAG, "onSucceeded");
540       }
541 
542       if (!haveTrailersBeenReported()) {
543         if (trailerList != null) {
544           reportHeaders(trailerList, true);
545         } else if (info != null) {
546           reportHeaders(info.getAllHeadersAsList(), true);
547         } else {
548           throw new AssertionError("No response header or trailer");
549         }
550       }
551       finishStream(toGrpcStatus(info));
552     }
553 
554     @Override
onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)555     public void onFailed(BidirectionalStream stream, UrlResponseInfo info,
556         CronetException error) {
557       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
558         Log.v(LOG_TAG, "onFailed");
559       }
560       finishStream(Status.UNAVAILABLE.withCause(error));
561     }
562 
563     @Override
onCanceled(BidirectionalStream stream, UrlResponseInfo info)564     public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
565       if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
566         Log.v(LOG_TAG, "onCanceled");
567       }
568       Status status;
569       synchronized (state.lock) {
570         if (state.cancelReason != null) {
571           status = state.cancelReason;
572         } else if (info != null) {
573           status = toGrpcStatus(info);
574         } else {
575           status = Status.CANCELLED.withDescription("stream cancelled without reason");
576         }
577       }
578       finishStream(status);
579     }
580 
reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream)581     private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) {
582       // TODO(ericgribkoff): create new utility methods to eliminate all these conversions
583       List<String> headerList = new ArrayList<>();
584       for (Map.Entry<String, String> entry : headers) {
585         headerList.add(entry.getKey());
586         headerList.add(entry.getValue());
587       }
588 
589       byte[][] headerValues = new byte[headerList.size()][];
590       for (int i = 0; i < headerList.size(); i += 2) {
591         headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8"));
592         headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8"));
593       }
594       Metadata metadata =
595           InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues));
596       synchronized (state.lock) {
597         // There's no pending onReadCompleted callback so we can report trailers now.
598         state.transportHeadersReceived(metadata, endOfStream);
599       }
600     }
601 
haveTrailersBeenReported()602     private boolean haveTrailersBeenReported() {
603       synchronized (state.lock) {
604         return trailerList != null && state.readClosed;
605       }
606     }
607 
toGrpcStatus(UrlResponseInfo info)608     private Status toGrpcStatus(UrlResponseInfo info) {
609       return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode());
610     }
611   }
612 
613   private static class PendingData {
614     ByteBuffer buffer;
615     boolean endOfStream;
616     boolean flush;
617 
PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush)618     PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) {
619       this.buffer = buffer;
620       this.endOfStream = endOfStream;
621       this.flush = flush;
622     }
623   }
624 }
625