1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.cronet; 18 19 import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY; 20 import static io.grpc.internal.GrpcUtil.TE_HEADER; 21 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; 22 23 // TODO(ericgribkoff): Consider changing from android.util.Log to java logging. 24 import android.util.Log; 25 import com.google.common.annotations.VisibleForTesting; 26 import com.google.common.base.Preconditions; 27 import com.google.common.io.BaseEncoding; 28 import io.grpc.Attributes; 29 import io.grpc.CallOptions; 30 import io.grpc.InternalMetadata; 31 import io.grpc.Metadata; 32 import io.grpc.MethodDescriptor; 33 import io.grpc.Status; 34 import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; 35 import io.grpc.internal.AbstractClientStream; 36 import io.grpc.internal.GrpcUtil; 37 import io.grpc.internal.Http2ClientStreamTransportState; 38 import io.grpc.internal.ReadableBuffers; 39 import io.grpc.internal.StatsTraceContext; 40 import io.grpc.internal.TransportFrameUtil; 41 import io.grpc.internal.TransportTracer; 42 import io.grpc.internal.WritableBuffer; 43 import java.nio.ByteBuffer; 44 import java.nio.charset.Charset; 45 import java.util.ArrayList; 46 import java.util.Collection; 47 import java.util.LinkedList; 48 import java.util.List; 49 import java.util.Map; 50 import java.util.Queue; 51 import java.util.concurrent.Executor; 52 import javax.annotation.Nullable; 53 import javax.annotation.concurrent.GuardedBy; 54 import org.chromium.net.BidirectionalStream; 55 import org.chromium.net.CronetException; 56 import org.chromium.net.ExperimentalBidirectionalStream; 57 import org.chromium.net.UrlResponseInfo; 58 59 /** 60 * Client stream for the cronet transport. 61 */ 62 class CronetClientStream extends AbstractClientStream { 63 private static final int READ_BUFFER_CAPACITY = 4 * 1024; 64 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); 65 private static final String LOG_TAG = "grpc-java-cronet"; 66 private final String url; 67 private final String userAgent; 68 private final StatsTraceContext statsTraceCtx; 69 private final Executor executor; 70 private final Metadata headers; 71 private final CronetClientTransport transport; 72 private final Runnable startCallback; 73 @VisibleForTesting 74 final boolean idempotent; 75 private BidirectionalStream stream; 76 private final boolean delayRequestHeader; 77 private final Object annotation; 78 private final Collection<Object> annotations; 79 private final TransportState state; 80 private final Sink sink = new Sink(); 81 private StreamBuilderFactory streamFactory; 82 CronetClientStream( final String url, @Nullable String userAgent, Executor executor, final Metadata headers, CronetClientTransport transport, Runnable startCallback, Object lock, int maxMessageSize, boolean alwaysUsePut, MethodDescriptor<?, ?> method, StatsTraceContext statsTraceCtx, CallOptions callOptions, TransportTracer transportTracer)83 CronetClientStream( 84 final String url, 85 @Nullable String userAgent, 86 Executor executor, 87 final Metadata headers, 88 CronetClientTransport transport, 89 Runnable startCallback, 90 Object lock, 91 int maxMessageSize, 92 boolean alwaysUsePut, 93 MethodDescriptor<?, ?> method, 94 StatsTraceContext statsTraceCtx, 95 CallOptions callOptions, 96 TransportTracer transportTracer) { 97 super( 98 new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, 99 method.isSafe()); 100 this.url = Preconditions.checkNotNull(url, "url"); 101 this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent"); 102 this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); 103 this.executor = Preconditions.checkNotNull(executor, "executor"); 104 this.headers = Preconditions.checkNotNull(headers, "headers"); 105 this.transport = Preconditions.checkNotNull(transport, "transport"); 106 this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback"); 107 this.idempotent = method.isIdempotent() || alwaysUsePut; 108 // Only delay flushing header for unary rpcs. 109 this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY); 110 this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY); 111 this.annotations = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATIONS_KEY); 112 this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer); 113 } 114 115 @Override transportState()116 protected TransportState transportState() { 117 return state; 118 } 119 120 @Override abstractClientStreamSink()121 protected Sink abstractClientStreamSink() { 122 return sink; 123 } 124 125 @Override setAuthority(String authority)126 public void setAuthority(String authority) { 127 throw new UnsupportedOperationException("Cronet does not support overriding authority"); 128 } 129 130 class Sink implements AbstractClientStream.Sink { 131 @Override writeHeaders(Metadata metadata, byte[] payload)132 public void writeHeaders(Metadata metadata, byte[] payload) { 133 startCallback.run(); 134 135 BidirectionalStreamCallback callback = new BidirectionalStreamCallback(); 136 String path = url; 137 if (payload != null) { 138 path += "?" + BaseEncoding.base64().encode(payload); 139 } 140 BidirectionalStream.Builder builder = 141 streamFactory.newBidirectionalStreamBuilder(path, callback, executor); 142 if (payload != null) { 143 builder.setHttpMethod("GET"); 144 } else if (idempotent) { 145 builder.setHttpMethod("PUT"); 146 } 147 if (delayRequestHeader) { 148 builder.delayRequestHeadersUntilFirstFlush(true); 149 } 150 if (annotation != null) { 151 ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(annotation); 152 } 153 if (annotations != null) { 154 for (Object o : annotations) { 155 ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(o); 156 } 157 } 158 setGrpcHeaders(builder); 159 stream = builder.build(); 160 stream.start(); 161 } 162 163 @Override writeFrame( WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages)164 public void writeFrame( 165 WritableBuffer buffer, boolean endOfStream, boolean flush, int numMessages) { 166 synchronized (state.lock) { 167 if (state.cancelSent) { 168 return; 169 } 170 ByteBuffer byteBuffer; 171 if (buffer != null) { 172 byteBuffer = ((CronetWritableBuffer) buffer).buffer(); 173 byteBuffer.flip(); 174 } else { 175 byteBuffer = EMPTY_BUFFER; 176 } 177 onSendingBytes(byteBuffer.remaining()); 178 if (!state.streamReady) { 179 state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush)); 180 } else { 181 streamWrite(byteBuffer, endOfStream, flush); 182 } 183 } 184 } 185 186 @Override request(final int numMessages)187 public void request(final int numMessages) { 188 synchronized (state.lock) { 189 state.requestMessagesFromDeframer(numMessages); 190 } 191 } 192 193 @Override cancel(Status reason)194 public void cancel(Status reason) { 195 synchronized (state.lock) { 196 if (state.cancelSent) { 197 return; 198 } 199 state.cancelSent = true; 200 state.cancelReason = reason; 201 state.clearPendingData(); 202 if (stream != null) { 203 // Will report stream finish when BidirectionalStreamCallback.onCanceled is called. 204 stream.cancel(); 205 } else { 206 transport.finishStream(CronetClientStream.this, reason); 207 } 208 } 209 } 210 } 211 212 class TransportState extends Http2ClientStreamTransportState { 213 private final Object lock; 214 @GuardedBy("lock") 215 private Queue<PendingData> pendingData = new LinkedList<PendingData>(); 216 @GuardedBy("lock") 217 private boolean streamReady; 218 @GuardedBy("lock") 219 private boolean cancelSent = false; 220 @GuardedBy("lock") 221 private int bytesPendingProcess; 222 @GuardedBy("lock") 223 private Status cancelReason; 224 @GuardedBy("lock") 225 private boolean readClosed; 226 @GuardedBy("lock") 227 private boolean firstWriteComplete; 228 TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, TransportTracer transportTracer)229 public TransportState( 230 int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock, 231 TransportTracer transportTracer) { 232 super(maxMessageSize, statsTraceCtx, transportTracer); 233 this.lock = Preconditions.checkNotNull(lock, "lock"); 234 } 235 236 @GuardedBy("lock") start(StreamBuilderFactory factory)237 public void start(StreamBuilderFactory factory) { 238 streamFactory = factory; 239 } 240 241 @GuardedBy("lock") 242 @Override onStreamAllocated()243 protected void onStreamAllocated() { 244 super.onStreamAllocated(); 245 } 246 247 @GuardedBy("lock") 248 @Override http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers)249 protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) { 250 stream.cancel(); 251 transportReportStatus(status, stopDelivery, trailers); 252 } 253 254 @GuardedBy("lock") 255 @Override deframeFailed(Throwable cause)256 public void deframeFailed(Throwable cause) { 257 http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata()); 258 } 259 260 @Override runOnTransportThread(final Runnable r)261 public void runOnTransportThread(final Runnable r) { 262 synchronized (lock) { 263 r.run(); 264 } 265 } 266 267 @GuardedBy("lock") 268 @Override bytesRead(int processedBytes)269 public void bytesRead(int processedBytes) { 270 bytesPendingProcess -= processedBytes; 271 if (bytesPendingProcess == 0 && !readClosed) { 272 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 273 Log.v(LOG_TAG, "BidirectionalStream.read"); 274 } 275 stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); 276 } 277 } 278 279 @GuardedBy("lock") transportHeadersReceived(Metadata metadata, boolean endOfStream)280 private void transportHeadersReceived(Metadata metadata, boolean endOfStream) { 281 if (endOfStream) { 282 transportTrailersReceived(metadata); 283 } else { 284 transportHeadersReceived(metadata); 285 } 286 } 287 288 @GuardedBy("lock") transportDataReceived(ByteBuffer buffer, boolean endOfStream)289 private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) { 290 bytesPendingProcess += buffer.remaining(); 291 super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream); 292 } 293 294 @GuardedBy("lock") clearPendingData()295 private void clearPendingData() { 296 for (PendingData data : pendingData) { 297 data.buffer.clear(); 298 } 299 pendingData.clear(); 300 } 301 302 @GuardedBy("lock") enqueuePendingData(PendingData data)303 private void enqueuePendingData(PendingData data) { 304 pendingData.add(data); 305 } 306 307 @GuardedBy("lock") writeAllPendingData()308 private void writeAllPendingData() { 309 for (PendingData data : pendingData) { 310 streamWrite(data.buffer, data.endOfStream, data.flush); 311 } 312 pendingData.clear(); 313 } 314 } 315 316 // TODO(ericgribkoff): move header related method to a common place like GrpcUtil. isApplicationHeader(String key)317 private static boolean isApplicationHeader(String key) { 318 // Don't allow reserved non HTTP/2 pseudo headers to be added 319 // HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character. 320 return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key) 321 && !USER_AGENT_KEY.name().equalsIgnoreCase(key) 322 && !TE_HEADER.name().equalsIgnoreCase(key); 323 } 324 setGrpcHeaders(BidirectionalStream.Builder builder)325 private void setGrpcHeaders(BidirectionalStream.Builder builder) { 326 // Psuedo-headers are set by cronet. 327 // All non-pseudo headers must come after pseudo headers. 328 // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed. 329 builder.addHeader(USER_AGENT_KEY.name(), userAgent); 330 builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC); 331 builder.addHeader("te", GrpcUtil.TE_TRAILERS); 332 333 // Now add any application-provided headers. 334 // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between 335 // String and byte array. 336 byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers); 337 for (int i = 0; i < serializedHeaders.length; i += 2) { 338 String key = new String(serializedHeaders[i], Charset.forName("UTF-8")); 339 // TODO(ericgribkoff): log an error or throw an exception 340 if (isApplicationHeader(key)) { 341 String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8")); 342 builder.addHeader(key, value); 343 } 344 } 345 } 346 streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush)347 private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) { 348 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 349 Log.v(LOG_TAG, "BidirectionalStream.write"); 350 } 351 stream.write(buffer, endOfStream); 352 if (flush) { 353 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 354 Log.v(LOG_TAG, "BidirectionalStream.flush"); 355 } 356 stream.flush(); 357 } 358 } 359 finishStream(Status status)360 private void finishStream(Status status) { 361 transport.finishStream(this, status); 362 } 363 364 @Override getAttributes()365 public Attributes getAttributes() { 366 return Attributes.EMPTY; 367 } 368 369 class BidirectionalStreamCallback extends BidirectionalStream.Callback { 370 private List<Map.Entry<String, String>> trailerList; 371 372 @Override onStreamReady(BidirectionalStream stream)373 public void onStreamReady(BidirectionalStream stream) { 374 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 375 Log.v(LOG_TAG, "onStreamReady"); 376 } 377 synchronized (state.lock) { 378 // Now that the stream is ready, call the listener's onReady callback if 379 // appropriate. 380 state.onStreamAllocated(); 381 state.streamReady = true; 382 state.writeAllPendingData(); 383 } 384 } 385 386 @Override onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)387 public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { 388 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 389 Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList()); 390 Log.v(LOG_TAG, "BidirectionalStream.read"); 391 } 392 reportHeaders(info.getAllHeadersAsList(), false); 393 stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); 394 } 395 396 @Override onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)397 public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, 398 ByteBuffer buffer, boolean endOfStream) { 399 buffer.flip(); 400 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 401 Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining()); 402 } 403 404 synchronized (state.lock) { 405 state.readClosed = endOfStream; 406 // The endOfStream in gRPC has a different meaning so we always call transportDataReceived 407 // with endOfStream=false. 408 if (buffer.remaining() != 0) { 409 state.transportDataReceived(buffer, false); 410 } 411 } 412 if (endOfStream && trailerList != null) { 413 // Process trailers if we have already received any. 414 reportHeaders(trailerList, true); 415 } 416 } 417 418 @Override onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)419 public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, 420 ByteBuffer buffer, boolean endOfStream) { 421 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 422 Log.v(LOG_TAG, "onWriteCompleted"); 423 } 424 synchronized (state.lock) { 425 if (!state.firstWriteComplete) { 426 // Cronet API doesn't notify when headers are written to wire, but it occurs before first 427 // onWriteCompleted callback. 428 state.firstWriteComplete = true; 429 statsTraceCtx.clientOutboundHeaders(); 430 } 431 state.onSentBytes(buffer.position()); 432 } 433 } 434 435 @Override onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)436 public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, 437 UrlResponseInfo.HeaderBlock trailers) { 438 processTrailers(trailers.getAsList()); 439 } 440 441 // We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be 442 // mocked. 443 @VisibleForTesting processTrailers(List<Map.Entry<String, String>> trailerList)444 void processTrailers(List<Map.Entry<String, String>> trailerList) { 445 this.trailerList = trailerList; 446 boolean readClosed; 447 synchronized (state.lock) { 448 readClosed = state.readClosed; 449 } 450 if (readClosed) { 451 // There's no pending onReadCompleted callback so we can report trailers now. 452 reportHeaders(trailerList, true); 453 } 454 // Otherwise report trailers in onReadCompleted, or onSucceeded. 455 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 456 Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString()); 457 } 458 } 459 460 @Override onSucceeded(BidirectionalStream stream, UrlResponseInfo info)461 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 462 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 463 Log.v(LOG_TAG, "onSucceeded"); 464 } 465 466 if (!haveTrailersBeenReported()) { 467 if (trailerList != null) { 468 reportHeaders(trailerList, true); 469 } else if (info != null) { 470 reportHeaders(info.getAllHeadersAsList(), true); 471 } else { 472 throw new AssertionError("No response header or trailer"); 473 } 474 } 475 finishStream(toGrpcStatus(info)); 476 } 477 478 @Override onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)479 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, 480 CronetException error) { 481 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 482 Log.v(LOG_TAG, "onFailed"); 483 } 484 finishStream(Status.UNAVAILABLE.withCause(error)); 485 } 486 487 @Override onCanceled(BidirectionalStream stream, UrlResponseInfo info)488 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { 489 if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { 490 Log.v(LOG_TAG, "onCanceled"); 491 } 492 Status status; 493 synchronized (state.lock) { 494 if (state.cancelReason != null) { 495 status = state.cancelReason; 496 } else if (info != null) { 497 status = toGrpcStatus(info); 498 } else { 499 status = Status.CANCELLED.withDescription("stream cancelled without reason"); 500 } 501 } 502 finishStream(status); 503 } 504 reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream)505 private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) { 506 // TODO(ericgribkoff): create new utility methods to eliminate all these conversions 507 List<String> headerList = new ArrayList<>(); 508 for (Map.Entry<String, String> entry : headers) { 509 headerList.add(entry.getKey()); 510 headerList.add(entry.getValue()); 511 } 512 513 byte[][] headerValues = new byte[headerList.size()][]; 514 for (int i = 0; i < headerList.size(); i += 2) { 515 headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8")); 516 headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8")); 517 } 518 Metadata metadata = 519 InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues)); 520 synchronized (state.lock) { 521 // There's no pending onReadCompleted callback so we can report trailers now. 522 state.transportHeadersReceived(metadata, endOfStream); 523 } 524 } 525 haveTrailersBeenReported()526 private boolean haveTrailersBeenReported() { 527 synchronized (state.lock) { 528 return trailerList != null && state.readClosed; 529 } 530 } 531 toGrpcStatus(UrlResponseInfo info)532 private Status toGrpcStatus(UrlResponseInfo info) { 533 return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode()); 534 } 535 } 536 537 private static class PendingData { 538 ByteBuffer buffer; 539 boolean endOfStream; 540 boolean flush; 541 PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush)542 PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) { 543 this.buffer = buffer; 544 this.endOfStream = endOfStream; 545 this.flush = flush; 546 } 547 } 548 } 549