1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.cronet; 18 19 import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY; 20 import static io.grpc.internal.GrpcUtil.TE_HEADER; 21 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; 22 23 // TODO(ericgribkoff): Consider changing from android.util.Log to java logging. 24 import android.util.Log; 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.base.Preconditions; 27 import com.google.common.io.BaseEncoding; 28 import io.grpc.Attributes; 29 import io.grpc.CallOptions; 30 import io.grpc.InternalMetadata; 31 import io.grpc.Metadata; 32 import io.grpc.MethodDescriptor; 33 import io.grpc.Status; 34 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; 35 import io.grpc.internal.AbstractClientStream; 36 import io.grpc.internal.GrpcUtil; 37 import io.grpc.internal.Http2ClientStreamTransportState; 38 import io.grpc.internal.ReadableBuffers; 39 import io.grpc.internal.StatsTraceContext; 40 import io.grpc.internal.TransportFrameUtil; 41 import io.grpc.internal.TransportTracer; 42 import io.grpc.internal.WritableBuffer; 43 import java.lang.reflect.InvocationTargetException; 44 import java.lang.reflect.Method; 45 import java.nio.Buffer; 46 import java.nio.ByteBuffer; 47 import java.nio.charset.Charset; 48 import java.util.ArrayList; 49 import java.util.Collection; 50 import java.util.Collections; 51 import java.util.List; 52 import java.util.Map; 53 import java.util.concurrent.Executor; 54 import javax.annotation.Nullable; 55 import javax.annotation.concurrent.GuardedBy; 56 import org.chromium.net.BidirectionalStream; 57 import org.chromium.net.CronetException; 58 import org.chromium.net.ExperimentalBidirectionalStream; 59 import org.chromium.net.UrlResponseInfo; 60 61 /** 62 * Client stream for the cronet transport. 63 */ 64 class CronetClientStream extends AbstractClientStream { 65 private static final int READ_BUFFER_CAPACITY = 4 * 1024; 66 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); 67 private static final String LOG_TAG = "grpc-java-cronet"; 68 69 private static volatile boolean loadAddRequestAnnotationAttempted; 70 private static volatile Method addRequestAnnotationMethod; 71 72 @Deprecated 73 static final CallOptions.Key<Object> CRONET_ANNOTATION_KEY = 74 CallOptions.Key.create("cronet-annotation"); 75 76 static final CallOptions.Key<Collection<Object>> CRONET_ANNOTATIONS_KEY = 77 CallOptions.Key.create("cronet-annotations"); 78 79 private final String url; 80 private final String userAgent; 81 private final StatsTraceContext statsTraceCtx; 82 private final Executor executor; 83 private final Metadata headers; 84 private final CronetClientTransport transport; 85 private final Runnable startCallback; 86 @VisibleForTesting 87 final boolean idempotent; 88 private BidirectionalStream stream; 89 private final boolean delayRequestHeader; 90 private final Object annotation; 91 private final Collection<Object> annotations; 92 private final TransportState state; 93 private final Sink sink = new Sink(); 94 private StreamBuilderFactory streamFactory; 95 CronetClientStream( final String url, @Nullable String userAgent, Executor executor, final Metadata headers, CronetClientTransport transport, Runnable startCallback, Object lock, int maxMessageSize, boolean alwaysUsePut, MethodDescriptor<?, ?> method, StatsTraceContext statsTraceCtx, CallOptions callOptions, TransportTracer transportTracer, boolean useGetForSafeMethods, boolean usePutForIdempotentMethods)96 CronetClientStream( 97 final String url, 98 @Nullable String userAgent, 99 Executor executor, 100 final Metadata headers, 101 CronetClientTransport transport, 102 Runnable startCallback, 103 Object lock, 104 int maxMessageSize, 105 boolean alwaysUsePut, 106 MethodDescriptor<?, ?> method, 107 StatsTraceContext statsTraceCtx, 108 CallOptions callOptions, 109 TransportTracer transportTracer, 110 boolean useGetForSafeMethods, 111 boolean usePutForIdempotentMethods) { 112 super( 113 new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions, 114 useGetForSafeMethods && method.isSafe()); 115 this.url = Preconditions.checkNotNull(url, "url"); 116 this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent"); 117 this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); 118 this.executor = Preconditions.checkNotNull(executor, "executor"); 119 this.headers = Preconditions.checkNotNull(headers, "headers"); 120 this.transport = Preconditions.checkNotNull(transport, "transport"); 121 this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback"); 122 this.idempotent = (usePutForIdempotentMethods && method.isIdempotent()) || alwaysUsePut; 123 // Only delay flushing header for unary rpcs. 124 this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY); 125 this.annotation = callOptions.getOption(CRONET_ANNOTATION_KEY); 126 this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY); 127 this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer); 128 129 // Tests expect the "plain" deframer behavior, not MigratingDeframer 130 // https://github.com/grpc/grpc-java/issues/7140 131 optimizeForDirectExecutor(); 132 } 133 134 @Override transportState()135 protected TransportState transportState() { 136 return state; 137 } 138 139 @Override abstractClientStreamSink()140 protected Sink abstractClientStreamSink() { 141 return sink; 142 } 143 144 @Override setAuthority(String authority)145 public void setAuthority(String authority) { 146 throw new UnsupportedOperationException("Cronet does not support overriding authority"); 147 } 148 149 /** 150 * Returns a copy of {@code callOptions} with {@code annotation} included as one of the Cronet 151 * annotation objects. When an RPC is made using a {@link CallOptions} instance returned by this 152 * method, the annotation objects will be attached to the underlying Cronet bidirectional stream. 153 * When the stream finishes, the user can retrieve the annotation objects via {@link 154 * org.chromium.net.RequestFinishedInfo.Listener}. 155 * 156 * @param annotation the object to attach to the Cronet stream 157 */ withAnnotation(CallOptions callOptions, Object annotation)158 static CallOptions withAnnotation(CallOptions callOptions, Object annotation) { 159 Collection<Object> existingAnnotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY); 160 ArrayList<Object> newAnnotations; 161 if (existingAnnotations == null) { 162 newAnnotations = new ArrayList<>(); 163 } else { 164 newAnnotations = new ArrayList<>(existingAnnotations); 165 } 166 newAnnotations.add(annotation); 167 return callOptions.withOption( 168 CRONET_ANNOTATIONS_KEY, Collections.unmodifiableList(newAnnotations)); 169 } 170 171 class Sink implements AbstractClientStream.Sink { 172 @Override writeHeaders(Metadata metadata, byte[] payload)173 public void writeHeaders(Metadata metadata, byte[] payload) { 174 startCallback.run(); 175 // streamFactory will be set by startCallback, unless the transport is in go-away state 176 if (streamFactory == null) { 177 return; 178 } 179 180 BidirectionalStreamCallback callback = new BidirectionalStreamCallback(); 181 String path = url; 182 if (payload != null) { 183 path += "?" + BaseEncoding.base64().encode(payload); 184 } 185 BidirectionalStream.Builder builder = 186 streamFactory.newBidirectionalStreamBuilder(path, callback, executor); 187 if (payload != null) { 188 builder.setHttpMethod("GET"); 189 } else if (idempotent) { 190 builder.setHttpMethod("PUT"); 191 } 192 if (delayRequestHeader) { 193 builder.delayRequestHeadersUntilFirstFlush(true); 194 } 195 if (annotation != null || annotations != null) { 196 ExperimentalBidirectionalStream.Builder expBidiStreamBuilder = 197 (ExperimentalBidirectionalStream.Builder) builder; 198 if (annotation != null) { 199 addRequestAnnotation(expBidiStreamBuilder, annotation); 200 } 201 if (annotations != null) { 202 for (Object o : annotations) { 203 addRequestAnnotation(expBidiStreamBuilder, o); 204 } 205 } 206 } 207 setGrpcHeaders(builder); 208 stream = builder.build(); 209 stream.start(); 210 } 211 212 @Override writeFrame( WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages)213 public void writeFrame( 214 WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages) { 215 synchronized (state.lock) { 216 if (state.cancelSent) { 217 return; 218 } 219 ByteBuffer byteBuffer; 220 if (buffer != null) { 221 byteBuffer = ((CronetWritableBuffer) buffer).buffer(); 222 ((Buffer) byteBuffer).flip(); 223 } else { 224 byteBuffer = EMPTY_BUFFER; 225 } 226 onSendingBytes(byteBuffer.remaining()); 227 if (!state.streamReady) { 228 state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush)); 229 } else { 230 streamWrite(byteBuffer, endOfStream, flush); 231 } 232 } 233 } 234 235 @Override cancel(Status reason)236 public void cancel(Status reason) { 237 synchronized (state.lock) { 238 if (state.cancelSent) { 239 return; 240 } 241 state.cancelSent = true; 242 state.cancelReason = reason; 243 state.clearPendingData(); 244 if (stream != null) { 245 // Will report stream finish when BidirectionalStreamCallback.onCanceled is called. 246 stream.cancel(); 247 } else { 248 transport.finishStream(CronetClientStream.this, reason); 249 } 250 } 251 } 252 } 253 254 class TransportState extends Http2ClientStreamTransportState { 255 private final Object lock; 256 @GuardedBy("lock") 257 private Collection<PendingData> pendingData = new ArrayList<PendingData>(); 258 @GuardedBy("lock") 259 private boolean streamReady; 260 @GuardedBy("lock") 261 private boolean cancelSent = false; 262 @GuardedBy("lock") 263 private int bytesPendingProcess; 264 @GuardedBy("lock") 265 private Status cancelReason; 266 @GuardedBy("lock") 267 private boolean readClosed; 268 @GuardedBy("lock") 269 private boolean firstWriteComplete; 270 TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, TransportTracer transportTracer)271 public TransportState( 272 int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, 273 TransportTracer transportTracer) { 274 super(maxMessageSize, statsTraceCtx, transportTracer); 275 this.lock = Preconditions.checkNotNull(lock, "lock"); 276 } 277 278 @GuardedBy("lock") start(StreamBuilderFactory factory)279 public void start(StreamBuilderFactory factory) { 280 streamFactory = factory; 281 } 282 283 @GuardedBy("lock") 284 @Override onStreamAllocated()285 protected void onStreamAllocated() { 286 super.onStreamAllocated(); 287 } 288 289 @GuardedBy("lock") 290 @Override http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)291 protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) { 292 Preconditions.checkNotNull(stream, "stream must not be null"); 293 stream.cancel(); 294 transportReportStatus(status, stopDelivery, trailers); 295 } 296 297 @GuardedBy("lock") 298 @Override deframeFailed(Throwable cause)299 public void deframeFailed(Throwable cause) { 300 http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata()); 301 } 302 303 @Override runOnTransportThread(final Runnable r)304 public void runOnTransportThread(final Runnable r) { 305 synchronized (lock) { 306 r.run(); 307 } 308 } 309 310 @GuardedBy("lock") 311 @Override bytesRead(int processedBytes)312 public void bytesRead(int processedBytes) { 313 Preconditions.checkNotNull(stream, "stream must not be null"); 314 bytesPendingProcess -= processedBytes; 315 if (bytesPendingProcess == 0 && !readClosed) { 316 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 317 Log.v(LOG_TAG, "BidirectionalStream.read"); 318 } 319 stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); 320 } 321 } 322 323 @GuardedBy("lock") transportHeadersReceived(Metadata metadata, boolean endOfStream)324 private void transportHeadersReceived(Metadata metadata, boolean endOfStream) { 325 if (endOfStream) { 326 transportTrailersReceived(metadata); 327 } else { 328 transportHeadersReceived(metadata); 329 } 330 } 331 332 @GuardedBy("lock") transportDataReceived(ByteBuffer buffer, boolean endOfStream)333 private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) { 334 bytesPendingProcess += buffer.remaining(); 335 super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream); 336 } 337 338 @GuardedBy("lock") clearPendingData()339 private void clearPendingData() { 340 for (PendingData data : pendingData) { 341 data.buffer.clear(); 342 } 343 pendingData.clear(); 344 } 345 346 @GuardedBy("lock") enqueuePendingData(PendingData data)347 private void enqueuePendingData(PendingData data) { 348 pendingData.add(data); 349 } 350 351 @GuardedBy("lock") writeAllPendingData()352 private void writeAllPendingData() { 353 for (PendingData data : pendingData) { 354 streamWrite(data.buffer, data.endOfStream, data.flush); 355 } 356 pendingData.clear(); 357 } 358 } 359 360 // TODO(ericgribkoff): move header related method to a common place like GrpcUtil. isApplicationHeader(String key)361 private static boolean isApplicationHeader(String key) { 362 // Don't allow reserved non HTTP/2 pseudo headers to be added 363 // HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character. 364 return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key) 365 && !USER_AGENT_KEY.name().equalsIgnoreCase(key) 366 && !TE_HEADER.name().equalsIgnoreCase(key); 367 } 368 addRequestAnnotation(ExperimentalBidirectionalStream.Builder builder, Object annotation)369 private static void addRequestAnnotation(ExperimentalBidirectionalStream.Builder builder, 370 Object annotation) { 371 if (!loadAddRequestAnnotationAttempted) { 372 synchronized (CronetClientStream.class) { 373 if (!loadAddRequestAnnotationAttempted) { 374 try { 375 addRequestAnnotationMethod = ExperimentalBidirectionalStream.Builder.class 376 .getMethod("addRequestAnnotation", Object.class); 377 } catch (NoSuchMethodException e) { 378 Log.w(LOG_TAG, 379 "Failed to load method ExperimentalBidirectionalStream.Builder.addRequestAnnotation", 380 e); 381 } finally { 382 loadAddRequestAnnotationAttempted = true; 383 } 384 } 385 } 386 } 387 if (addRequestAnnotationMethod != null) { 388 try { 389 addRequestAnnotationMethod.invoke(builder, annotation); 390 } catch (InvocationTargetException e) { 391 throw new RuntimeException(e.getCause() == null ? e.getTargetException() : e.getCause()); 392 } catch (IllegalAccessException e) { 393 Log.w(LOG_TAG, "Failed to add request annotation: " + annotation, e); 394 } 395 } 396 } 397 setGrpcHeaders(BidirectionalStream.Builder builder)398 private void setGrpcHeaders(BidirectionalStream.Builder builder) { 399 // Psuedo-headers are set by cronet. 400 // All non-pseudo headers must come after pseudo headers. 401 // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed. 402 builder.addHeader(USER_AGENT_KEY.name(), userAgent); 403 builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC); 404 builder.addHeader("te", GrpcUtil.TE_TRAILERS); 405 406 // Now add any application-provided headers. 407 // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between 408 // String and byte array. 409 byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers); 410 for (int i = 0; i < serializedHeaders.length; i += 2) { 411 String key = new String(serializedHeaders[i], Charset.forName("UTF-8")); 412 // TODO(ericgribkoff): log an error or throw an exception 413 if (isApplicationHeader(key)) { 414 String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8")); 415 builder.addHeader(key, value); 416 } 417 } 418 } 419 streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush)420 private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) { 421 if (stream == null) { 422 return; 423 } 424 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 425 Log.v(LOG_TAG, "BidirectionalStream.write"); 426 } 427 stream.write(buffer, endOfStream); 428 if (flush) { 429 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 430 Log.v(LOG_TAG, "BidirectionalStream.flush"); 431 } 432 stream.flush(); 433 } 434 } 435 finishStream(Status status)436 private void finishStream(Status status) { 437 transport.finishStream(this, status); 438 } 439 440 @Override getAttributes()441 public Attributes getAttributes() { 442 return Attributes.EMPTY; 443 } 444 445 class BidirectionalStreamCallback extends BidirectionalStream.Callback { 446 private List<Map.Entry<String, String>> trailerList; 447 448 @Override onStreamReady(BidirectionalStream stream)449 public void onStreamReady(BidirectionalStream stream) { 450 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 451 Log.v(LOG_TAG, "onStreamReady"); 452 } 453 synchronized (state.lock) { 454 // Now that the stream is ready, call the listener's onReady callback if 455 // appropriate. 456 state.onStreamAllocated(); 457 state.streamReady = true; 458 state.writeAllPendingData(); 459 } 460 } 461 462 @Override onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)463 public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { 464 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 465 Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList()); 466 Log.v(LOG_TAG, "BidirectionalStream.read"); 467 } 468 reportHeaders(info.getAllHeadersAsList(), false); 469 stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); 470 } 471 472 @Override onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)473 public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, 474 ByteBuffer buffer, boolean endOfStream) { 475 ((Buffer) buffer).flip(); 476 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 477 Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining()); 478 } 479 480 synchronized (state.lock) { 481 state.readClosed = endOfStream; 482 // The endOfStream in gRPC has a different meaning so we always call transportDataReceived 483 // with endOfStream=false. 484 if (buffer.remaining() != 0) { 485 state.transportDataReceived(buffer, false); 486 } 487 } 488 if (endOfStream && trailerList != null) { 489 // Process trailers if we have already received any. 490 reportHeaders(trailerList, true); 491 } 492 } 493 494 @Override onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)495 public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, 496 ByteBuffer buffer, boolean endOfStream) { 497 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 498 Log.v(LOG_TAG, "onWriteCompleted"); 499 } 500 synchronized (state.lock) { 501 if (!state.firstWriteComplete) { 502 // Cronet API doesn't notify when headers are written to wire, but it occurs before first 503 // onWriteCompleted callback. 504 state.firstWriteComplete = true; 505 statsTraceCtx.clientOutboundHeaders(); 506 } 507 state.onSentBytes(buffer.position()); 508 } 509 } 510 511 @Override onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)512 public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, 513 UrlResponseInfo.HeaderBlock trailers) { 514 processTrailers(trailers.getAsList()); 515 } 516 517 // We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be 518 // mocked. 519 @VisibleForTesting processTrailers(List<Map.Entry<String, String>> trailerList)520 void processTrailers(List<Map.Entry<String, String>> trailerList) { 521 this.trailerList = trailerList; 522 boolean readClosed; 523 synchronized (state.lock) { 524 readClosed = state.readClosed; 525 } 526 if (readClosed) { 527 // There's no pending onReadCompleted callback so we can report trailers now. 528 reportHeaders(trailerList, true); 529 } 530 // Otherwise report trailers in onReadCompleted, or onSucceeded. 531 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 532 Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString()); 533 } 534 } 535 536 @Override onSucceeded(BidirectionalStream stream, UrlResponseInfo info)537 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 538 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 539 Log.v(LOG_TAG, "onSucceeded"); 540 } 541 542 if (!haveTrailersBeenReported()) { 543 if (trailerList != null) { 544 reportHeaders(trailerList, true); 545 } else if (info != null) { 546 reportHeaders(info.getAllHeadersAsList(), true); 547 } else { 548 throw new AssertionError("No response header or trailer"); 549 } 550 } 551 finishStream(toGrpcStatus(info)); 552 } 553 554 @Override onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)555 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, 556 CronetException error) { 557 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 558 Log.v(LOG_TAG, "onFailed"); 559 } 560 finishStream(Status.UNAVAILABLE.withCause(error)); 561 } 562 563 @Override onCanceled(BidirectionalStream stream, UrlResponseInfo info)564 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { 565 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 566 Log.v(LOG_TAG, "onCanceled"); 567 } 568 Status status; 569 synchronized (state.lock) { 570 if (state.cancelReason != null) { 571 status = state.cancelReason; 572 } else if (info != null) { 573 status = toGrpcStatus(info); 574 } else { 575 status = Status.CANCELLED.withDescription("stream cancelled without reason"); 576 } 577 } 578 finishStream(status); 579 } 580 reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream)581 private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) { 582 // TODO(ericgribkoff): create new utility methods to eliminate all these conversions 583 List<String> headerList = new ArrayList<>(); 584 for (Map.Entry<String, String> entry : headers) { 585 headerList.add(entry.getKey()); 586 headerList.add(entry.getValue()); 587 } 588 589 byte[][] headerValues = new byte[headerList.size()][]; 590 for (int i = 0; i < headerList.size(); i += 2) { 591 headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8")); 592 headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8")); 593 } 594 Metadata metadata = 595 InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues)); 596 synchronized (state.lock) { 597 // There's no pending onReadCompleted callback so we can report trailers now. 598 state.transportHeadersReceived(metadata, endOfStream); 599 } 600 } 601 haveTrailersBeenReported()602 private boolean haveTrailersBeenReported() { 603 synchronized (state.lock) { 604 return trailerList != null && state.readClosed; 605 } 606 } 607 toGrpcStatus(UrlResponseInfo info)608 private Status toGrpcStatus(UrlResponseInfo info) { 609 return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode()); 610 } 611 } 612 613 private static class PendingData { 614 ByteBuffer buffer; 615 boolean endOfStream; 616 boolean flush; 617 PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush)618 PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) { 619 this.buffer = buffer; 620 this.endOfStream = endOfStream; 621 this.flush = flush; 622 } 623 } 624 } 625