1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; 22 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 23 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; 24 import static java.lang.Math.max; 25 26 import com.google.common.annotations.VisibleForTesting; 27 import com.google.common.base.Preconditions; 28 import com.google.common.io.ByteStreams; 29 import io.grpc.Attributes; 30 import io.grpc.CallOptions; 31 import io.grpc.Codec; 32 import io.grpc.Compressor; 33 import io.grpc.Deadline; 34 import io.grpc.Decompressor; 35 import io.grpc.DecompressorRegistry; 36 import io.grpc.Grpc; 37 import io.grpc.Metadata; 38 import io.grpc.Status; 39 import io.grpc.internal.ClientStreamListener.RpcProgress; 40 import java.io.InputStream; 41 import java.util.concurrent.TimeUnit; 42 import java.util.logging.Level; 43 import java.util.logging.Logger; 44 import javax.annotation.Nullable; 45 46 /** 47 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to 48 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called 49 * from the sending application thread. 50 */ 51 public abstract class AbstractClientStream extends AbstractStream 52 implements ClientStream, MessageFramer.Sink { 53 54 private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName()); 55 56 /** 57 * A sink for outbound operations, separated from the stream simply to avoid name 58 * collisions/confusion. Only called from application thread. 59 */ 60 protected interface Sink { 61 /** 62 * Sends the request headers to the remote end point. 63 * 64 * @param metadata the metadata to be sent 65 * @param payload the payload needs to be sent in the headers if not null. Should only be used 66 * when sending an unary GET request 67 */ writeHeaders(Metadata metadata, @Nullable byte[] payload)68 void writeHeaders(Metadata metadata, @Nullable byte[] payload); 69 70 /** 71 * Sends an outbound frame to the remote end point. 72 * 73 * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code 74 * endOfStream} with no data to send 75 * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be 76 * {@code true} if this is {@code true} 77 * @param flush {@code true} if more data may not be arriving soon 78 * @param numMessages the number of messages this series of frames represents, must be >= 0. 79 */ writeFrame( @ullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)80 void writeFrame( 81 @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages); 82 83 /** 84 * Tears down the stream, typically in the event of a timeout. This method may be called 85 * multiple times and from any thread. 86 * 87 * <p>This is a clone of {@link ClientStream#cancel(Status)}; 88 * {@link AbstractClientStream#cancel} delegates to this method. 89 */ cancel(Status status)90 void cancel(Status status); 91 } 92 93 private final TransportTracer transportTracer; 94 private final Framer framer; 95 private boolean shouldBeCountedForInUse; 96 private boolean useGet; 97 private Metadata headers; 98 /** 99 * Whether cancel() has been called. This is not strictly necessary, but removes the delay between 100 * cancel() being called and isReady() beginning to return false, since cancel is commonly 101 * processed asynchronously. 102 */ 103 private volatile boolean cancelled; 104 AbstractClientStream( WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, Metadata headers, CallOptions callOptions, boolean useGet)105 protected AbstractClientStream( 106 WritableBufferAllocator bufferAllocator, 107 StatsTraceContext statsTraceCtx, 108 TransportTracer transportTracer, 109 Metadata headers, 110 CallOptions callOptions, 111 boolean useGet) { 112 checkNotNull(headers, "headers"); 113 this.transportTracer = checkNotNull(transportTracer, "transportTracer"); 114 this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions); 115 this.useGet = useGet; 116 if (!useGet) { 117 framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); 118 this.headers = headers; 119 } else { 120 framer = new GetFramer(headers, statsTraceCtx); 121 } 122 } 123 124 @Override setDeadline(Deadline deadline)125 public void setDeadline(Deadline deadline) { 126 headers.discardAll(TIMEOUT_KEY); 127 long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); 128 headers.put(TIMEOUT_KEY, effectiveTimeout); 129 } 130 131 @Override setMaxOutboundMessageSize(int maxSize)132 public void setMaxOutboundMessageSize(int maxSize) { 133 framer.setMaxOutboundMessageSize(maxSize); 134 } 135 136 @Override setMaxInboundMessageSize(int maxSize)137 public void setMaxInboundMessageSize(int maxSize) { 138 transportState().setMaxInboundMessageSize(maxSize); 139 } 140 141 @Override setFullStreamDecompression(boolean fullStreamDecompression)142 public final void setFullStreamDecompression(boolean fullStreamDecompression) { 143 transportState().setFullStreamDecompression(fullStreamDecompression); 144 } 145 146 @Override setDecompressorRegistry(DecompressorRegistry decompressorRegistry)147 public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 148 transportState().setDecompressorRegistry(decompressorRegistry); 149 } 150 151 /** {@inheritDoc} */ 152 @Override transportState()153 protected abstract TransportState transportState(); 154 155 @Override start(ClientStreamListener listener)156 public final void start(ClientStreamListener listener) { 157 transportState().setListener(listener); 158 if (!useGet) { 159 abstractClientStreamSink().writeHeaders(headers, null); 160 headers = null; 161 } 162 } 163 164 /** 165 * Sink for transport to be called to perform outbound operations. Each stream must have its own 166 * unique sink. 167 */ abstractClientStreamSink()168 protected abstract Sink abstractClientStreamSink(); 169 170 @Override framer()171 protected final Framer framer() { 172 return framer; 173 } 174 175 /** 176 * Returns true if this stream should be counted when determining the in-use state of the 177 * transport. 178 */ shouldBeCountedForInUse()179 public final boolean shouldBeCountedForInUse() { 180 return shouldBeCountedForInUse; 181 } 182 183 @Override deliverFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)184 public final void deliverFrame( 185 WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { 186 Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS"); 187 abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages); 188 } 189 190 @Override halfClose()191 public final void halfClose() { 192 if (!transportState().isOutboundClosed()) { 193 transportState().setOutboundClosed(); 194 endOfMessages(); 195 } 196 } 197 198 @Override cancel(Status reason)199 public final void cancel(Status reason) { 200 Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status"); 201 cancelled = true; 202 abstractClientStreamSink().cancel(reason); 203 } 204 205 @Override isReady()206 public final boolean isReady() { 207 return super.isReady() && !cancelled; 208 } 209 210 @Override appendTimeoutInsight(InsightBuilder insight)211 public final void appendTimeoutInsight(InsightBuilder insight) { 212 Attributes attrs = getAttributes(); 213 insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); 214 } 215 getTransportTracer()216 protected TransportTracer getTransportTracer() { 217 return transportTracer; 218 } 219 220 /** This should only be called from the transport thread. */ 221 protected abstract static class TransportState extends AbstractStream.TransportState { 222 /** Whether listener.closed() has been called. */ 223 private final StatsTraceContext statsTraceCtx; 224 private boolean listenerClosed; 225 private ClientStreamListener listener; 226 private boolean fullStreamDecompression; 227 private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); 228 229 private boolean deframerClosed = false; 230 private Runnable deframerClosedTask; 231 232 /** Whether the client has half-closed the stream. */ 233 private volatile boolean outboundClosed; 234 235 /** 236 * Whether the stream is closed from the transport's perspective. This can differ from {@link 237 * #listenerClosed} because there may still be messages buffered to deliver to the application. 238 */ 239 private boolean statusReported; 240 /** True if the status reported (set via {@link #transportReportStatus}) is OK. */ 241 private boolean statusReportedIsOk; 242 TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)243 protected TransportState( 244 int maxMessageSize, 245 StatsTraceContext statsTraceCtx, 246 TransportTracer transportTracer) { 247 super(maxMessageSize, statsTraceCtx, transportTracer); 248 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 249 } 250 setFullStreamDecompression(boolean fullStreamDecompression)251 private void setFullStreamDecompression(boolean fullStreamDecompression) { 252 this.fullStreamDecompression = fullStreamDecompression; 253 } 254 setDecompressorRegistry(DecompressorRegistry decompressorRegistry)255 private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 256 checkState(this.listener == null, "Already called start"); 257 this.decompressorRegistry = 258 checkNotNull(decompressorRegistry, "decompressorRegistry"); 259 } 260 261 @VisibleForTesting setListener(ClientStreamListener listener)262 public final void setListener(ClientStreamListener listener) { 263 checkState(this.listener == null, "Already called setListener"); 264 this.listener = checkNotNull(listener, "listener"); 265 } 266 267 @Override deframerClosed(boolean hasPartialMessage)268 public void deframerClosed(boolean hasPartialMessage) { 269 checkState(statusReported, "status should have been reported on deframer closed"); 270 deframerClosed = true; 271 if (statusReportedIsOk && hasPartialMessage) { 272 transportReportStatus( 273 Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"), 274 true, 275 new Metadata()); 276 } 277 if (deframerClosedTask != null) { 278 deframerClosedTask.run(); 279 deframerClosedTask = null; 280 } 281 } 282 283 @Override listener()284 protected final ClientStreamListener listener() { 285 return listener; 286 } 287 setOutboundClosed()288 private final void setOutboundClosed() { 289 outboundClosed = true; 290 } 291 isOutboundClosed()292 protected final boolean isOutboundClosed() { 293 return outboundClosed; 294 } 295 296 /** 297 * Called by transport implementations when they receive headers. 298 * 299 * @param headers the parsed headers 300 */ inboundHeadersReceived(Metadata headers)301 protected void inboundHeadersReceived(Metadata headers) { 302 checkState(!statusReported, "Received headers on closed stream"); 303 statsTraceCtx.clientInboundHeaders(); 304 305 boolean compressedStream = false; 306 String streamEncoding = headers.get(CONTENT_ENCODING_KEY); 307 if (fullStreamDecompression && streamEncoding != null) { 308 if (streamEncoding.equalsIgnoreCase("gzip")) { 309 setFullStreamDecompressor(new GzipInflatingBuffer()); 310 compressedStream = true; 311 } else if (!streamEncoding.equalsIgnoreCase("identity")) { 312 deframeFailed( 313 Status.INTERNAL 314 .withDescription( 315 String.format("Can't find full stream decompressor for %s", streamEncoding)) 316 .asRuntimeException()); 317 return; 318 } 319 } 320 321 String messageEncoding = headers.get(MESSAGE_ENCODING_KEY); 322 if (messageEncoding != null) { 323 Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding); 324 if (decompressor == null) { 325 deframeFailed( 326 Status.INTERNAL 327 .withDescription(String.format("Can't find decompressor for %s", messageEncoding)) 328 .asRuntimeException()); 329 return; 330 } else if (decompressor != Codec.Identity.NONE) { 331 if (compressedStream) { 332 deframeFailed( 333 Status.INTERNAL 334 .withDescription("Full stream and gRPC message encoding cannot both be set") 335 .asRuntimeException()); 336 return; 337 } 338 setDecompressor(decompressor); 339 } 340 } 341 342 listener().headersRead(headers); 343 } 344 345 /** 346 * Processes the contents of a received data frame from the server. 347 * 348 * @param frame the received data frame. Its ownership is transferred to this method. 349 */ inboundDataReceived(ReadableBuffer frame)350 protected void inboundDataReceived(ReadableBuffer frame) { 351 checkNotNull(frame, "frame"); 352 boolean needToCloseFrame = true; 353 try { 354 if (statusReported) { 355 log.log(Level.INFO, "Received data on closed stream"); 356 return; 357 } 358 359 needToCloseFrame = false; 360 deframe(frame); 361 } finally { 362 if (needToCloseFrame) { 363 frame.close(); 364 } 365 } 366 } 367 368 /** 369 * Processes the trailers and status from the server. 370 * 371 * @param trailers the received trailers 372 * @param status the status extracted from the trailers 373 */ inboundTrailersReceived(Metadata trailers, Status status)374 protected void inboundTrailersReceived(Metadata trailers, Status status) { 375 checkNotNull(status, "status"); 376 checkNotNull(trailers, "trailers"); 377 if (statusReported) { 378 log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}", 379 new Object[]{status, trailers}); 380 return; 381 } 382 statsTraceCtx.clientInboundTrailers(trailers); 383 transportReportStatus(status, false, trailers); 384 } 385 386 /** 387 * Report stream closure with status to the application layer if not already reported. This 388 * method must be called from the transport thread. 389 * 390 * @param status the new status to set 391 * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that 392 * may already be queued up in the deframer. If {@code false}, the listener will be 393 * notified immediately after all currently completed messages in the deframer have been 394 * delivered to the application. 395 * @param trailers new instance of {@code Trailers}, either empty or those returned by the 396 * server 397 */ transportReportStatus(final Status status, boolean stopDelivery, final Metadata trailers)398 public final void transportReportStatus(final Status status, boolean stopDelivery, 399 final Metadata trailers) { 400 transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers); 401 } 402 403 /** 404 * Report stream closure with status to the application layer if not already reported. This 405 * method must be called from the transport thread. 406 * 407 * @param status the new status to set 408 * @param rpcProgress RPC progress that the 409 * {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)} 410 * will receive 411 * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that 412 * may already be queued up in the deframer and overrides any previously queued status. 413 * If {@code false}, the listener will be notified immediately after all currently 414 * completed messages in the deframer have been delivered to the application. 415 * @param trailers new instance of {@code Trailers}, either empty or those returned by the 416 * server 417 */ transportReportStatus( final Status status, final RpcProgress rpcProgress, boolean stopDelivery, final Metadata trailers)418 public final void transportReportStatus( 419 final Status status, 420 final RpcProgress rpcProgress, 421 boolean stopDelivery, 422 final Metadata trailers) { 423 checkNotNull(status, "status"); 424 checkNotNull(trailers, "trailers"); 425 // If stopDelivery, we continue in case previous invocation is waiting for stall 426 if (statusReported && !stopDelivery) { 427 return; 428 } 429 statusReported = true; 430 statusReportedIsOk = status.isOk(); 431 onStreamDeallocated(); 432 433 if (deframerClosed) { 434 deframerClosedTask = null; 435 closeListener(status, rpcProgress, trailers); 436 } else { 437 deframerClosedTask = 438 new Runnable() { 439 @Override 440 public void run() { 441 closeListener(status, rpcProgress, trailers); 442 } 443 }; 444 closeDeframer(stopDelivery); 445 } 446 } 447 448 /** 449 * Closes the listener if not previously closed. 450 * 451 * @throws IllegalStateException if the call has not yet been started. 452 */ closeListener( Status status, RpcProgress rpcProgress, Metadata trailers)453 private void closeListener( 454 Status status, RpcProgress rpcProgress, Metadata trailers) { 455 if (!listenerClosed) { 456 listenerClosed = true; 457 statsTraceCtx.streamClosed(status); 458 listener().closed(status, rpcProgress, trailers); 459 if (getTransportTracer() != null) { 460 getTransportTracer().reportStreamClosed(status.isOk()); 461 } 462 } 463 } 464 } 465 466 private class GetFramer implements Framer { 467 private Metadata headers; 468 private boolean closed; 469 private final StatsTraceContext statsTraceCtx; 470 private byte[] payload; 471 GetFramer(Metadata headers, StatsTraceContext statsTraceCtx)472 public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) { 473 this.headers = checkNotNull(headers, "headers"); 474 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 475 } 476 477 @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27 478 @Override writePayload(InputStream message)479 public void writePayload(InputStream message) { 480 checkState(payload == null, "writePayload should not be called multiple times"); 481 try { 482 payload = ByteStreams.toByteArray(message); 483 } catch (java.io.IOException ex) { 484 throw new RuntimeException(ex); 485 } 486 statsTraceCtx.outboundMessage(0); 487 statsTraceCtx.outboundMessageSent(0, payload.length, payload.length); 488 statsTraceCtx.outboundUncompressedSize(payload.length); 489 // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode 490 // it using e.g., base64. However, we are not supposed to know such detail here. 491 // 492 // We don't want to move this line to where the encoding happens either, because we'd better 493 // contain the message stats reporting in Framer as suggested in StatsTraceContext. 494 // Scattering the reporting sites increases the risk of mis-counting or double-counting. 495 // 496 // Because the payload is usually very small, people shouldn't care about the size difference 497 // caused by encoding. 498 statsTraceCtx.outboundWireSize(payload.length); 499 } 500 501 @Override flush()502 public void flush() {} 503 504 @Override isClosed()505 public boolean isClosed() { 506 return closed; 507 } 508 509 /** Closes, with flush. */ 510 @Override close()511 public void close() { 512 closed = true; 513 checkState(payload != null, 514 "Lack of request message. GET request is only supported for unary requests"); 515 abstractClientStreamSink().writeHeaders(headers, payload); 516 payload = null; 517 headers = null; 518 } 519 520 /** Closes, without flush. */ 521 @Override dispose()522 public void dispose() { 523 closed = true; 524 payload = null; 525 headers = null; 526 } 527 528 // Compression is not supported for GET encoding. 529 @Override setMessageCompression(boolean enable)530 public Framer setMessageCompression(boolean enable) { 531 return this; 532 } 533 534 @Override setCompressor(Compressor compressor)535 public Framer setCompressor(Compressor compressor) { 536 return this; 537 } 538 539 // TODO(zsurocking): support this 540 @Override setMaxOutboundMessageSize(int maxSize)541 public void setMaxOutboundMessageSize(int maxSize) {} 542 } 543 } 544