1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY; 22 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 23 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; 24 import static java.lang.Math.max; 25 26 import com.google.common.annotations.VisibleForTesting; 27 import com.google.common.base.Preconditions; 28 import io.grpc.Codec; 29 import io.grpc.Compressor; 30 import io.grpc.Deadline; 31 import io.grpc.Decompressor; 32 import io.grpc.DecompressorRegistry; 33 import io.grpc.Metadata; 34 import io.grpc.Status; 35 import io.grpc.internal.ClientStreamListener.RpcProgress; 36 import java.io.InputStream; 37 import java.util.concurrent.TimeUnit; 38 import java.util.logging.Level; 39 import java.util.logging.Logger; 40 import javax.annotation.Nullable; 41 42 /** 43 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to 44 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called 45 * from the sending application thread. 46 */ 47 public abstract class AbstractClientStream extends AbstractStream 48 implements ClientStream, MessageFramer.Sink { 49 50 private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName()); 51 52 /** 53 * A sink for outbound operations, separated from the stream simply to avoid name 54 * collisions/confusion. Only called from application thread. 55 */ 56 protected interface Sink { 57 /** 58 * Sends the request headers to the remote end point. 59 * 60 * @param metadata the metadata to be sent 61 * @param payload the payload needs to be sent in the headers if not null. Should only be used 62 * when sending an unary GET request 63 */ writeHeaders(Metadata metadata, @Nullable byte[] payload)64 void writeHeaders(Metadata metadata, @Nullable byte[] payload); 65 66 /** 67 * Sends an outbound frame to the remote end point. 68 * 69 * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code 70 * endOfStream} with no data to send 71 * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be 72 * {@code true} if this is {@code true} 73 * @param flush {@code true} if more data may not be arriving soon 74 * @Param numMessages the number of messages this series of frames represents, must be >= 0. 75 */ writeFrame( @ullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)76 void writeFrame( 77 @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages); 78 79 /** 80 * Requests up to the given number of messages from the call to be delivered to the client. This 81 * should end up triggering {@link TransportState#requestMessagesFromDeframer(int)} on the 82 * transport thread. 83 */ request(int numMessages)84 void request(int numMessages); 85 86 /** 87 * Tears down the stream, typically in the event of a timeout. This method may be called 88 * multiple times and from any thread. 89 * 90 * <p>This is a clone of {@link ClientStream#cancel(Status)}; 91 * {@link AbstractClientStream#cancel} delegates to this method. 92 */ cancel(Status status)93 void cancel(Status status); 94 } 95 96 private final TransportTracer transportTracer; 97 private final Framer framer; 98 private boolean useGet; 99 private Metadata headers; 100 /** 101 * Whether cancel() has been called. This is not strictly necessary, but removes the delay between 102 * cancel() being called and isReady() beginning to return false, since cancel is commonly 103 * processed asynchronously. 104 */ 105 private volatile boolean cancelled; 106 AbstractClientStream( WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, Metadata headers, boolean useGet)107 protected AbstractClientStream( 108 WritableBufferAllocator bufferAllocator, 109 StatsTraceContext statsTraceCtx, 110 TransportTracer transportTracer, 111 Metadata headers, 112 boolean useGet) { 113 checkNotNull(headers, "headers"); 114 this.transportTracer = checkNotNull(transportTracer, "transportTracer"); 115 this.useGet = useGet; 116 if (!useGet) { 117 framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); 118 this.headers = headers; 119 } else { 120 framer = new GetFramer(headers, statsTraceCtx); 121 } 122 } 123 124 @Override setDeadline(Deadline deadline)125 public void setDeadline(Deadline deadline) { 126 headers.discardAll(TIMEOUT_KEY); 127 long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); 128 headers.put(TIMEOUT_KEY, effectiveTimeout); 129 } 130 131 @Override setMaxOutboundMessageSize(int maxSize)132 public void setMaxOutboundMessageSize(int maxSize) { 133 framer.setMaxOutboundMessageSize(maxSize); 134 } 135 136 @Override setMaxInboundMessageSize(int maxSize)137 public void setMaxInboundMessageSize(int maxSize) { 138 transportState().setMaxInboundMessageSize(maxSize); 139 } 140 141 @Override setFullStreamDecompression(boolean fullStreamDecompression)142 public final void setFullStreamDecompression(boolean fullStreamDecompression) { 143 transportState().setFullStreamDecompression(fullStreamDecompression); 144 } 145 146 @Override setDecompressorRegistry(DecompressorRegistry decompressorRegistry)147 public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 148 transportState().setDecompressorRegistry(decompressorRegistry); 149 } 150 151 /** {@inheritDoc} */ 152 @Override transportState()153 protected abstract TransportState transportState(); 154 155 @Override start(ClientStreamListener listener)156 public final void start(ClientStreamListener listener) { 157 transportState().setListener(listener); 158 if (!useGet) { 159 abstractClientStreamSink().writeHeaders(headers, null); 160 headers = null; 161 } 162 } 163 164 /** 165 * Sink for transport to be called to perform outbound operations. Each stream must have its own 166 * unique sink. 167 */ abstractClientStreamSink()168 protected abstract Sink abstractClientStreamSink(); 169 170 @Override framer()171 protected final Framer framer() { 172 return framer; 173 } 174 175 @Override request(int numMessages)176 public final void request(int numMessages) { 177 abstractClientStreamSink().request(numMessages); 178 } 179 180 @Override deliverFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages)181 public final void deliverFrame( 182 WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { 183 Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS"); 184 abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages); 185 } 186 187 @Override halfClose()188 public final void halfClose() { 189 if (!transportState().isOutboundClosed()) { 190 transportState().setOutboundClosed(); 191 endOfMessages(); 192 } 193 } 194 195 @Override cancel(Status reason)196 public final void cancel(Status reason) { 197 Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status"); 198 cancelled = true; 199 abstractClientStreamSink().cancel(reason); 200 } 201 202 @Override isReady()203 public final boolean isReady() { 204 return super.isReady() && !cancelled; 205 } 206 getTransportTracer()207 protected TransportTracer getTransportTracer() { 208 return transportTracer; 209 } 210 211 /** This should only called from the transport thread. */ 212 protected abstract static class TransportState extends AbstractStream.TransportState { 213 /** Whether listener.closed() has been called. */ 214 private final StatsTraceContext statsTraceCtx; 215 private boolean listenerClosed; 216 private ClientStreamListener listener; 217 private boolean fullStreamDecompression; 218 private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); 219 220 private boolean deframerClosed = false; 221 private Runnable deframerClosedTask; 222 223 /** Whether the client has half-closed the stream. */ 224 private volatile boolean outboundClosed; 225 226 /** 227 * Whether the stream is closed from the transport's perspective. This can differ from {@link 228 * #listenerClosed} because there may still be messages buffered to deliver to the application. 229 */ 230 private boolean statusReported; 231 private Metadata trailers; 232 private Status trailerStatus; 233 TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)234 protected TransportState( 235 int maxMessageSize, 236 StatsTraceContext statsTraceCtx, 237 TransportTracer transportTracer) { 238 super(maxMessageSize, statsTraceCtx, transportTracer); 239 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 240 } 241 setFullStreamDecompression(boolean fullStreamDecompression)242 private void setFullStreamDecompression(boolean fullStreamDecompression) { 243 this.fullStreamDecompression = fullStreamDecompression; 244 } 245 setDecompressorRegistry(DecompressorRegistry decompressorRegistry)246 private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 247 checkState(this.listener == null, "Already called start"); 248 this.decompressorRegistry = 249 checkNotNull(decompressorRegistry, "decompressorRegistry"); 250 } 251 252 @VisibleForTesting setListener(ClientStreamListener listener)253 public final void setListener(ClientStreamListener listener) { 254 checkState(this.listener == null, "Already called setListener"); 255 this.listener = checkNotNull(listener, "listener"); 256 } 257 258 @Override deframerClosed(boolean hasPartialMessage)259 public void deframerClosed(boolean hasPartialMessage) { 260 deframerClosed = true; 261 262 if (trailerStatus != null) { 263 if (trailerStatus.isOk() && hasPartialMessage) { 264 trailerStatus = Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"); 265 trailers = new Metadata(); 266 } 267 transportReportStatus(trailerStatus, false, trailers); 268 } else { 269 checkState(statusReported, "status should have been reported on deframer closed"); 270 } 271 272 if (deframerClosedTask != null) { 273 deframerClosedTask.run(); 274 deframerClosedTask = null; 275 } 276 } 277 278 @Override listener()279 protected final ClientStreamListener listener() { 280 return listener; 281 } 282 setOutboundClosed()283 private final void setOutboundClosed() { 284 outboundClosed = true; 285 } 286 isOutboundClosed()287 protected final boolean isOutboundClosed() { 288 return outboundClosed; 289 } 290 291 /** 292 * Called by transport implementations when they receive headers. 293 * 294 * @param headers the parsed headers 295 */ inboundHeadersReceived(Metadata headers)296 protected void inboundHeadersReceived(Metadata headers) { 297 checkState(!statusReported, "Received headers on closed stream"); 298 statsTraceCtx.clientInboundHeaders(); 299 300 boolean compressedStream = false; 301 String streamEncoding = headers.get(CONTENT_ENCODING_KEY); 302 if (fullStreamDecompression && streamEncoding != null) { 303 if (streamEncoding.equalsIgnoreCase("gzip")) { 304 setFullStreamDecompressor(new GzipInflatingBuffer()); 305 compressedStream = true; 306 } else if (!streamEncoding.equalsIgnoreCase("identity")) { 307 deframeFailed( 308 Status.INTERNAL 309 .withDescription( 310 String.format("Can't find full stream decompressor for %s", streamEncoding)) 311 .asRuntimeException()); 312 return; 313 } 314 } 315 316 String messageEncoding = headers.get(MESSAGE_ENCODING_KEY); 317 if (messageEncoding != null) { 318 Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding); 319 if (decompressor == null) { 320 deframeFailed( 321 Status.INTERNAL 322 .withDescription(String.format("Can't find decompressor for %s", messageEncoding)) 323 .asRuntimeException()); 324 return; 325 } else if (decompressor != Codec.Identity.NONE) { 326 if (compressedStream) { 327 deframeFailed( 328 Status.INTERNAL 329 .withDescription( 330 String.format("Full stream and gRPC message encoding cannot both be set")) 331 .asRuntimeException()); 332 return; 333 } 334 setDecompressor(decompressor); 335 } 336 } 337 338 listener().headersRead(headers); 339 } 340 341 /** 342 * Processes the contents of a received data frame from the server. 343 * 344 * @param frame the received data frame. Its ownership is transferred to this method. 345 */ inboundDataReceived(ReadableBuffer frame)346 protected void inboundDataReceived(ReadableBuffer frame) { 347 checkNotNull(frame, "frame"); 348 boolean needToCloseFrame = true; 349 try { 350 if (statusReported) { 351 log.log(Level.INFO, "Received data on closed stream"); 352 return; 353 } 354 355 needToCloseFrame = false; 356 deframe(frame); 357 } finally { 358 if (needToCloseFrame) { 359 frame.close(); 360 } 361 } 362 } 363 364 /** 365 * Processes the trailers and status from the server. 366 * 367 * @param trailers the received trailers 368 * @param status the status extracted from the trailers 369 */ inboundTrailersReceived(Metadata trailers, Status status)370 protected void inboundTrailersReceived(Metadata trailers, Status status) { 371 checkNotNull(status, "status"); 372 checkNotNull(trailers, "trailers"); 373 if (statusReported) { 374 log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}", 375 new Object[]{status, trailers}); 376 return; 377 } 378 this.trailers = trailers; 379 trailerStatus = status; 380 closeDeframer(false); 381 } 382 383 /** 384 * Report stream closure with status to the application layer if not already reported. This 385 * method must be called from the transport thread. 386 * 387 * @param status the new status to set 388 * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that 389 * may already be queued up in the deframer. If {@code false}, the listener will be 390 * notified immediately after all currently completed messages in the deframer have been 391 * delivered to the application. 392 * @param trailers new instance of {@code Trailers}, either empty or those returned by the 393 * server 394 */ transportReportStatus(final Status status, boolean stopDelivery, final Metadata trailers)395 public final void transportReportStatus(final Status status, boolean stopDelivery, 396 final Metadata trailers) { 397 transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers); 398 } 399 400 /** 401 * Report stream closure with status to the application layer if not already reported. This 402 * method must be called from the transport thread. 403 * 404 * @param status the new status to set 405 * @param rpcProgress RPC progress that the 406 * {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)} 407 * will receive 408 * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that 409 * may already be queued up in the deframer. If {@code false}, the listener will be 410 * notified immediately after all currently completed messages in the deframer have been 411 * delivered to the application. 412 * @param trailers new instance of {@code Trailers}, either empty or those returned by the 413 * server 414 */ transportReportStatus( final Status status, final RpcProgress rpcProgress, boolean stopDelivery, final Metadata trailers)415 public final void transportReportStatus( 416 final Status status, final RpcProgress rpcProgress, boolean stopDelivery, 417 final Metadata trailers) { 418 checkNotNull(status, "status"); 419 checkNotNull(trailers, "trailers"); 420 // If stopDelivery, we continue in case previous invocation is waiting for stall 421 if (statusReported && !stopDelivery) { 422 return; 423 } 424 statusReported = true; 425 onStreamDeallocated(); 426 427 if (deframerClosed) { 428 deframerClosedTask = null; 429 closeListener(status, rpcProgress, trailers); 430 } else { 431 deframerClosedTask = 432 new Runnable() { 433 @Override 434 public void run() { 435 closeListener(status, rpcProgress, trailers); 436 } 437 }; 438 closeDeframer(stopDelivery); 439 } 440 } 441 442 /** 443 * Closes the listener if not previously closed. 444 * 445 * @throws IllegalStateException if the call has not yet been started. 446 */ closeListener( Status status, RpcProgress rpcProgress, Metadata trailers)447 private void closeListener( 448 Status status, RpcProgress rpcProgress, Metadata trailers) { 449 if (!listenerClosed) { 450 listenerClosed = true; 451 statsTraceCtx.streamClosed(status); 452 listener().closed(status, rpcProgress, trailers); 453 if (getTransportTracer() != null) { 454 getTransportTracer().reportStreamClosed(status.isOk()); 455 } 456 } 457 } 458 } 459 460 private class GetFramer implements Framer { 461 private Metadata headers; 462 private boolean closed; 463 private final StatsTraceContext statsTraceCtx; 464 private byte[] payload; 465 GetFramer(Metadata headers, StatsTraceContext statsTraceCtx)466 public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) { 467 this.headers = checkNotNull(headers, "headers"); 468 this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); 469 } 470 471 @Override writePayload(InputStream message)472 public void writePayload(InputStream message) { 473 checkState(payload == null, "writePayload should not be called multiple times"); 474 try { 475 payload = IoUtils.toByteArray(message); 476 } catch (java.io.IOException ex) { 477 throw new RuntimeException(ex); 478 } 479 statsTraceCtx.outboundMessage(0); 480 statsTraceCtx.outboundMessageSent(0, payload.length, payload.length); 481 statsTraceCtx.outboundUncompressedSize(payload.length); 482 // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode 483 // it using e.g., base64. However, we are not supposed to know such detail here. 484 // 485 // We don't want to move this line to where the encoding happens either, because we'd better 486 // contain the message stats reporting in Framer as suggested in StatsTraceContext. 487 // Scattering the reporting sites increases the risk of mis-counting or double-counting. 488 // 489 // Because the payload is usually very small, people shouldn't care about the size difference 490 // caused by encoding. 491 statsTraceCtx.outboundWireSize(payload.length); 492 } 493 494 @Override flush()495 public void flush() {} 496 497 @Override isClosed()498 public boolean isClosed() { 499 return closed; 500 } 501 502 /** Closes, with flush. */ 503 @Override close()504 public void close() { 505 closed = true; 506 checkState(payload != null, 507 "Lack of request message. GET request is only supported for unary requests"); 508 abstractClientStreamSink().writeHeaders(headers, payload); 509 payload = null; 510 headers = null; 511 } 512 513 /** Closes, without flush. */ 514 @Override dispose()515 public void dispose() { 516 closed = true; 517 payload = null; 518 headers = null; 519 } 520 521 // Compression is not supported for GET encoding. 522 @Override setMessageCompression(boolean enable)523 public Framer setMessageCompression(boolean enable) { 524 return this; 525 } 526 527 @Override setCompressor(Compressor compressor)528 public Framer setCompressor(Compressor compressor) { 529 return this; 530 } 531 532 // TODO(zsurocking): support this 533 @Override setMaxOutboundMessageSize(int maxSize)534 public void setMaxOutboundMessageSize(int maxSize) {} 535 } 536 } 537