1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.binder.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 22 import android.os.Parcel; 23 import io.grpc.Attributes; 24 import io.grpc.Metadata; 25 import io.grpc.Status; 26 import io.grpc.StatusException; 27 import io.grpc.binder.InboundParcelablePolicy; 28 import io.grpc.internal.ClientStreamListener; 29 import io.grpc.internal.ClientStreamListener.RpcProgress; 30 import io.grpc.internal.ServerStream; 31 import io.grpc.internal.ServerStreamListener; 32 import io.grpc.internal.StatsTraceContext; 33 import io.grpc.internal.StreamListener; 34 import java.io.InputStream; 35 import java.util.ArrayList; 36 import javax.annotation.Nullable; 37 import javax.annotation.concurrent.GuardedBy; 38 39 /** 40 * Handles incoming binder transactions for a single stream, turning those transactions into calls 41 * to the stream listener. 42 * 43 * <p>Out-of-order messages are reassembled into their correct order. 44 */ 45 abstract class Inbound<L extends StreamListener> implements StreamListener.MessageProducer { 46 47 protected final BinderTransport transport; 48 protected final Attributes attributes; 49 final int callId; 50 51 // ========================== 52 // Values set when we're initialized. 53 54 @Nullable 55 @GuardedBy("this") 56 protected Outbound outbound; 57 58 @Nullable 59 @GuardedBy("this") 60 protected StatsTraceContext statsTraceContext; 61 62 @Nullable 63 @GuardedBy("this") 64 protected L listener; 65 66 // ========================== 67 // State of inbound data. 68 69 @Nullable 70 @GuardedBy("this") 71 private InputStream firstMessage; 72 73 @GuardedBy("this") 74 private int firstQueuedTransactionIndex; 75 76 @GuardedBy("this") 77 private int nextCompleteMessageEnd; 78 79 @Nullable 80 @GuardedBy("this") 81 private ArrayList<TransactionData> queuedTransactionData; 82 83 @GuardedBy("this") 84 private boolean suffixAvailable; 85 86 @GuardedBy("this") 87 private int suffixTransactionIndex; 88 89 @GuardedBy("this") 90 private int inboundDataSize; 91 92 // ========================== 93 // State of what we've delivered to gRPC. 94 95 /** 96 * Each rpc transmits (or receives) a prefix (including headers and possibly a method name), the 97 * data of zero or more request (or response) messages, and a suffix (possibly including a close 98 * status and trailers). 99 * 100 * <p>This enum represents those stages, for both availability (what we've been given), and 101 * delivery what we've sent. 102 */ 103 enum State { 104 // We aren't yet connected to a BinderStream instance and listener. Due to potentially 105 // out-of-order messages, a server-side instance can remain in this state for multiple 106 // transactions. 107 UNINITIALIZED, 108 109 // We're attached to a BinderStream instance and we have a listener we can report to. 110 // On the client-side, this happens as soon as the start() method is called (almost 111 // immediately), and on the server side, this happens as soon as we receive the prefix 112 // (so we know which method is being called). 113 INITIALIZED, 114 115 // We've delivered the prefix data to the listener. On the client side, this means we've 116 // delivered the response headers, and on the server side this state is effectively the same 117 // as INITIALIZED (since we initialize only by delivering the prefix). 118 PREFIX_DELIVERED, 119 120 // All messages have been received, and delivered to the listener. 121 ALL_MESSAGES_DELIVERED, 122 123 // We've delivered the suffix. 124 SUFFIX_DELIVERED, 125 126 // The stream is closed. 127 CLOSED 128 } 129 130 /* 131 * Represents which data we've delivered to the gRPC listener. 132 */ 133 @GuardedBy("this") 134 private State deliveryState = State.UNINITIALIZED; 135 136 @GuardedBy("this") 137 private int numReceivedMessages; 138 139 @GuardedBy("this") 140 private int numRequestedMessages; 141 142 @GuardedBy("this") 143 private boolean delivering; 144 145 @GuardedBy("this") 146 private boolean producingMessages; 147 Inbound(BinderTransport transport, Attributes attributes, int callId)148 private Inbound(BinderTransport transport, Attributes attributes, int callId) { 149 this.transport = transport; 150 this.attributes = attributes; 151 this.callId = callId; 152 } 153 154 @GuardedBy("this") init(Outbound outbound, L listener)155 final void init(Outbound outbound, L listener) { 156 this.outbound = outbound; 157 this.statsTraceContext = outbound.getStatsTraceContext(); 158 this.listener = listener; 159 if (!isClosed()) { 160 onDeliveryState(State.INITIALIZED); 161 } 162 } 163 unregister()164 final void unregister() { 165 transport.unregisterInbound(this); 166 } 167 countsForInUse()168 boolean countsForInUse() { 169 return false; 170 } 171 172 // ===================== 173 // Updates to delivery. 174 175 @GuardedBy("this") onDeliveryState(State deliveryState)176 protected final void onDeliveryState(State deliveryState) { 177 checkTransition(this.deliveryState, deliveryState); 178 this.deliveryState = deliveryState; 179 } 180 181 @GuardedBy("this") isClosed()182 protected final boolean isClosed() { 183 return deliveryState == State.CLOSED; 184 } 185 186 @GuardedBy("this") messageAvailable()187 private final boolean messageAvailable() { 188 return firstMessage != null || nextCompleteMessageEnd > 0; 189 } 190 191 @GuardedBy("this") receivedAllTransactions()192 private boolean receivedAllTransactions() { 193 return suffixAvailable && firstQueuedTransactionIndex >= suffixTransactionIndex; 194 } 195 196 // =================== 197 // Internals. 198 199 @GuardedBy("this") deliver()200 final void deliver() { 201 if (delivering) { 202 // Don't re-enter. 203 return; 204 } 205 delivering = true; 206 while (canDeliver()) { 207 deliverInternal(); 208 } 209 delivering = false; 210 } 211 212 @GuardedBy("this") canDeliver()213 private final boolean canDeliver() { 214 switch (deliveryState) { 215 case PREFIX_DELIVERED: 216 if (listener != null) { 217 if (producingMessages) { 218 // We're waiting for the listener to consume messages. Nothing to do. 219 return false; 220 } else if (messageAvailable()) { 221 // There's a message. We can deliver if we've been asked for messages, and we haven't 222 // already given the listener a MessageProducer. 223 return numRequestedMessages != 0; 224 } else { 225 // There are no messages available. Return true if that's the last of them, because we 226 // can send the suffix. 227 return receivedAllTransactions(); 228 } 229 } 230 return false; 231 case ALL_MESSAGES_DELIVERED: 232 return listener != null && suffixAvailable; 233 default: 234 return false; 235 } 236 } 237 238 @GuardedBy("this") 239 @SuppressWarnings("fallthrough") deliverInternal()240 private final void deliverInternal() { 241 switch (deliveryState) { 242 case PREFIX_DELIVERED: 243 if (producingMessages) { 244 break; 245 } else if (messageAvailable()) { 246 producingMessages = true; 247 listener.messagesAvailable(this); 248 break; 249 } else if (!suffixAvailable) { 250 break; 251 } 252 onDeliveryState(State.ALL_MESSAGES_DELIVERED); 253 // Fall-through. 254 case ALL_MESSAGES_DELIVERED: 255 if (suffixAvailable) { 256 onDeliveryState(State.SUFFIX_DELIVERED); 257 deliverSuffix(); 258 } 259 break; 260 default: 261 throw new AssertionError(); 262 } 263 } 264 265 /** Deliver the suffix to gRPC. */ deliverSuffix()266 protected abstract void deliverSuffix(); 267 268 @GuardedBy("this") closeOnCancel(Status status)269 final void closeOnCancel(Status status) { 270 closeAbnormal(Status.CANCELLED, status, false); 271 } 272 273 @GuardedBy("this") closeOutOfBand(Status status)274 private final void closeOutOfBand(Status status) { 275 closeAbnormal(status, status, true); 276 } 277 278 @GuardedBy("this") closeAbnormal(Status status)279 final void closeAbnormal(Status status) { 280 closeAbnormal(status, status, false); 281 } 282 283 @GuardedBy("this") closeAbnormal( Status outboundStatus, Status internalStatus, boolean isOobFromRemote)284 private final void closeAbnormal( 285 Status outboundStatus, Status internalStatus, boolean isOobFromRemote) { 286 if (!isClosed()) { 287 boolean wasInitialized = (deliveryState != State.UNINITIALIZED); 288 onDeliveryState(State.CLOSED); 289 if (wasInitialized) { 290 statsTraceContext.streamClosed(internalStatus); 291 } 292 if (!isOobFromRemote) { 293 transport.sendOutOfBandClose(callId, outboundStatus); 294 } 295 if (wasInitialized) { 296 deliverCloseAbnormal(internalStatus); 297 } 298 unregister(); 299 } 300 } 301 302 @GuardedBy("this") deliverCloseAbnormal(Status status)303 protected abstract void deliverCloseAbnormal(Status status); 304 onTransportReady()305 final void onTransportReady() { 306 // Report transport readiness to the listener, and the outbound data. 307 Outbound outbound = null; 308 StreamListener listener = null; 309 synchronized (this) { 310 outbound = this.outbound; 311 listener = this.listener; 312 } 313 if (listener != null) { 314 listener.onReady(); 315 } 316 if (outbound != null) { 317 try { 318 synchronized (outbound) { 319 outbound.onTransportReady(); 320 } 321 } catch (StatusException se) { 322 synchronized (this) { 323 closeAbnormal(se.getStatus()); 324 } 325 } 326 } 327 } 328 329 @GuardedBy("this") requestMessages(int num)330 public void requestMessages(int num) { 331 numRequestedMessages += num; 332 deliver(); 333 } 334 handleTransaction(Parcel parcel)335 final synchronized void handleTransaction(Parcel parcel) { 336 if (isClosed()) { 337 return; 338 } 339 try { 340 int flags = parcel.readInt(); 341 if (TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_OUT_OF_BAND_CLOSE)) { 342 closeOutOfBand(TransactionUtils.readStatus(flags, parcel)); 343 return; 344 } 345 int index = parcel.readInt(); 346 boolean hasPrefix = TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_PREFIX); 347 boolean hasMessageData = 348 TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_MESSAGE_DATA); 349 boolean hasSuffix = TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_SUFFIX); 350 if (hasPrefix) { 351 handlePrefix(flags, parcel); 352 onDeliveryState(State.PREFIX_DELIVERED); 353 } 354 if (hasMessageData) { 355 handleMessageData(flags, index, parcel); 356 } 357 if (hasSuffix) { 358 handleSuffix(flags, parcel); 359 suffixTransactionIndex = index; 360 suffixAvailable = true; 361 } 362 if (index == firstQueuedTransactionIndex) { 363 if (queuedTransactionData == null) { 364 // This message was in order, and we haven't needed to queue anything yet. 365 firstQueuedTransactionIndex += 1; 366 } else if (!hasMessageData && !hasSuffix) { 367 // The first transaction arrived, but it contained no message data. 368 queuedTransactionData.remove(0); 369 firstQueuedTransactionIndex += 1; 370 } 371 } 372 reportInboundSize(parcel.dataSize()); 373 deliver(); 374 } catch (StatusException se) { 375 closeAbnormal(se.getStatus()); 376 } 377 } 378 379 @GuardedBy("this") handlePrefix(int flags, Parcel parcel)380 abstract void handlePrefix(int flags, Parcel parcel) throws StatusException; 381 382 @GuardedBy("this") handleSuffix(int flags, Parcel parcel)383 abstract void handleSuffix(int flags, Parcel parcel) throws StatusException; 384 385 @GuardedBy("this") handleMessageData(int flags, int index, Parcel parcel)386 private void handleMessageData(int flags, int index, Parcel parcel) throws StatusException { 387 InputStream stream = null; 388 byte[] block = null; 389 boolean lastBlockOfMessage = true; 390 int numBytes = 0; 391 if ((flags & TransactionUtils.FLAG_MESSAGE_DATA_IS_PARCELABLE) != 0) { 392 InboundParcelablePolicy policy = attributes.get(BinderTransport.INBOUND_PARCELABLE_POLICY); 393 if (policy == null || !policy.shouldAcceptParcelableMessages()) { 394 throw Status.PERMISSION_DENIED 395 .withDescription("Parcelable messages not allowed") 396 .asException(); 397 } 398 int startPos = parcel.dataPosition(); 399 stream = ParcelableInputStream.readFromParcel(parcel, getClass().getClassLoader()); 400 numBytes = parcel.dataPosition() - startPos; 401 } else { 402 numBytes = parcel.readInt(); 403 block = BlockPool.acquireBlock(numBytes); 404 if (numBytes > 0) { 405 parcel.readByteArray(block); 406 } 407 if ((flags & TransactionUtils.FLAG_MESSAGE_DATA_IS_PARTIAL) != 0) { 408 // Partial message. Ensure we have a message assembler. 409 lastBlockOfMessage = false; 410 } 411 } 412 if (queuedTransactionData == null) { 413 if (numReceivedMessages == 0 && lastBlockOfMessage && index == firstQueuedTransactionIndex) { 414 // Shortcut for when we receive a single message in one transaction. 415 checkState(firstMessage == null); 416 firstMessage = (stream != null) ? stream : new BlockInputStream(block); 417 reportInboundMessage(numBytes); 418 return; 419 } 420 queuedTransactionData = new ArrayList<>(16); 421 } 422 enqueueTransactionData(index, new TransactionData(stream, block, numBytes, lastBlockOfMessage)); 423 } 424 425 @GuardedBy("this") enqueueTransactionData(int index, TransactionData data)426 private void enqueueTransactionData(int index, TransactionData data) { 427 int offset = index - firstQueuedTransactionIndex; 428 if (offset < queuedTransactionData.size()) { 429 queuedTransactionData.set(offset, data); 430 lookForCompleteMessage(); 431 } else if (offset > queuedTransactionData.size()) { 432 do { 433 queuedTransactionData.add(null); 434 } while (offset > queuedTransactionData.size()); 435 queuedTransactionData.add(data); 436 } else { 437 queuedTransactionData.add(data); 438 lookForCompleteMessage(); 439 } 440 } 441 442 @GuardedBy("this") lookForCompleteMessage()443 private void lookForCompleteMessage() { 444 int numBytes = 0; 445 if (nextCompleteMessageEnd == 0) { 446 for (int i = 0; i < queuedTransactionData.size(); i++) { 447 TransactionData data = queuedTransactionData.get(i); 448 if (data == null) { 449 // Missing block. 450 return; 451 } else { 452 numBytes += data.numBytes; 453 if (data.lastBlockOfMessage) { 454 // Found a complete message. 455 nextCompleteMessageEnd = i + 1; 456 reportInboundMessage(numBytes); 457 return; 458 } 459 } 460 } 461 } 462 } 463 464 @Override 465 @Nullable next()466 public final synchronized InputStream next() { 467 InputStream stream = null; 468 if (firstMessage != null) { 469 stream = firstMessage; 470 firstMessage = null; 471 } else if (numRequestedMessages > 0 && messageAvailable()) { 472 stream = assembleNextMessage(); 473 } 474 if (stream != null) { 475 numRequestedMessages -= 1; 476 } else { 477 producingMessages = false; 478 if (receivedAllTransactions()) { 479 // That's the last of the messages delivered. 480 if (!isClosed()) { 481 onDeliveryState(State.ALL_MESSAGES_DELIVERED); 482 deliver(); 483 } 484 } 485 } 486 return stream; 487 } 488 489 @GuardedBy("this") assembleNextMessage()490 private InputStream assembleNextMessage() { 491 InputStream message; 492 int numBlocks = nextCompleteMessageEnd; 493 nextCompleteMessageEnd = 0; 494 int numBytes = 0; 495 if (numBlocks == 1) { 496 // Single block. 497 TransactionData data = queuedTransactionData.remove(0); 498 numBytes = data.numBytes; 499 if (data.stream != null) { 500 message = data.stream; 501 } else { 502 message = new BlockInputStream(data.block); 503 } 504 } else { 505 byte[][] blocks = new byte[numBlocks][]; 506 for (int i = 0; i < numBlocks; i++) { 507 TransactionData data = queuedTransactionData.remove(0); 508 blocks[i] = checkNotNull(data.block); 509 numBytes += blocks[i].length; 510 } 511 message = new BlockInputStream(blocks, numBytes); 512 } 513 firstQueuedTransactionIndex += numBlocks; 514 lookForCompleteMessage(); 515 return message; 516 } 517 518 // ------------------------------------ 519 // stats collection. 520 521 @GuardedBy("this") reportInboundSize(int size)522 private void reportInboundSize(int size) { 523 inboundDataSize += size; 524 if (statsTraceContext != null && inboundDataSize != 0) { 525 statsTraceContext.inboundWireSize(inboundDataSize); 526 statsTraceContext.inboundUncompressedSize(inboundDataSize); 527 inboundDataSize = 0; 528 } 529 } 530 531 @GuardedBy("this") reportInboundMessage(int numBytes)532 private void reportInboundMessage(int numBytes) { 533 checkNotNull(statsTraceContext); 534 statsTraceContext.inboundMessage(numReceivedMessages); 535 statsTraceContext.inboundMessageRead(numReceivedMessages, numBytes, numBytes); 536 numReceivedMessages += 1; 537 } 538 539 @Override toString()540 public synchronized String toString() { 541 return getClass().getSimpleName() 542 + "[SfxA=" 543 + suffixAvailable 544 + "/De=" 545 + deliveryState 546 + "/Msg=" 547 + messageAvailable() 548 + "/Lis=" 549 + (listener != null) 550 + "]"; 551 } 552 553 // ====================================== 554 // Client-side inbound transactions. 555 static final class ClientInbound extends Inbound<ClientStreamListener> { 556 557 private final boolean countsForInUse; 558 559 @Nullable 560 @GuardedBy("this") 561 private Status closeStatus; 562 563 @Nullable 564 @GuardedBy("this") 565 private Metadata trailers; 566 ClientInbound( BinderTransport transport, Attributes attributes, int callId, boolean countsForInUse)567 ClientInbound( 568 BinderTransport transport, Attributes attributes, int callId, boolean countsForInUse) { 569 super(transport, attributes, callId); 570 this.countsForInUse = countsForInUse; 571 } 572 573 @Override countsForInUse()574 boolean countsForInUse() { 575 return countsForInUse; 576 } 577 578 @Override 579 @GuardedBy("this") handlePrefix(int flags, Parcel parcel)580 protected void handlePrefix(int flags, Parcel parcel) throws StatusException { 581 Metadata headers = MetadataHelper.readMetadata(parcel, attributes); 582 statsTraceContext.clientInboundHeaders(); 583 listener.headersRead(headers); 584 } 585 586 @Override 587 @GuardedBy("this") handleSuffix(int flags, Parcel parcel)588 protected void handleSuffix(int flags, Parcel parcel) throws StatusException { 589 closeStatus = TransactionUtils.readStatus(flags, parcel); 590 trailers = MetadataHelper.readMetadata(parcel, attributes); 591 } 592 593 @Override 594 @GuardedBy("this") deliverSuffix()595 protected void deliverSuffix() { 596 statsTraceContext.clientInboundTrailers(trailers); 597 statsTraceContext.streamClosed(closeStatus); 598 onDeliveryState(State.CLOSED); 599 listener.closed(closeStatus, RpcProgress.PROCESSED, trailers); 600 unregister(); 601 } 602 603 @Override 604 @GuardedBy("this") deliverCloseAbnormal(Status status)605 protected void deliverCloseAbnormal(Status status) { 606 listener.closed(status, RpcProgress.PROCESSED, new Metadata()); 607 } 608 } 609 610 // ====================================== 611 // Server-side inbound transactions. 612 static final class ServerInbound extends Inbound<ServerStreamListener> { 613 614 private final BinderTransport.BinderServerTransport serverTransport; 615 ServerInbound( BinderTransport.BinderServerTransport transport, Attributes attributes, int callId)616 ServerInbound( 617 BinderTransport.BinderServerTransport transport, Attributes attributes, int callId) { 618 super(transport, attributes, callId); 619 this.serverTransport = transport; 620 } 621 622 @GuardedBy("this") 623 @Override handlePrefix(int flags, Parcel parcel)624 protected void handlePrefix(int flags, Parcel parcel) throws StatusException { 625 String methodName = parcel.readString(); 626 Metadata headers = MetadataHelper.readMetadata(parcel, attributes); 627 628 StatsTraceContext statsTraceContext = 629 serverTransport.createStatsTraceContext(methodName, headers); 630 Outbound.ServerOutbound outbound = 631 new Outbound.ServerOutbound(serverTransport, callId, statsTraceContext); 632 ServerStream stream; 633 if ((flags & TransactionUtils.FLAG_EXPECT_SINGLE_MESSAGE) != 0) { 634 stream = new SingleMessageServerStream(this, outbound, attributes); 635 } else { 636 stream = new MultiMessageServerStream(this, outbound, attributes); 637 } 638 Status status = serverTransport.startStream(stream, methodName, headers); 639 if (status.isOk()) { 640 checkNotNull(listener); // Is it ok to assume this will happen synchronously? 641 if (transport.isReady()) { 642 listener.onReady(); 643 } 644 } else { 645 closeAbnormal(status); 646 } 647 } 648 649 @GuardedBy("this") 650 @Override handleSuffix(int flags, Parcel parcel)651 protected void handleSuffix(int flags, Parcel parcel) { 652 // Nothing to read. 653 } 654 655 @Override 656 @GuardedBy("this") deliverSuffix()657 protected void deliverSuffix() { 658 listener.halfClosed(); 659 } 660 661 @Override 662 @GuardedBy("this") deliverCloseAbnormal(Status status)663 protected void deliverCloseAbnormal(Status status) { 664 listener.closed(status); 665 } 666 667 @GuardedBy("this") onCloseSent(Status status)668 void onCloseSent(Status status) { 669 if (!isClosed()) { 670 onDeliveryState(State.CLOSED); 671 statsTraceContext.streamClosed(status); 672 listener.closed(Status.OK); 673 } 674 } 675 } 676 677 // ====================================== 678 // Helper methods. 679 checkTransition(State current, State next)680 private static void checkTransition(State current, State next) { 681 switch (next) { 682 case INITIALIZED: 683 checkState(current == State.UNINITIALIZED, "%s -> %s", current, next); 684 break; 685 case PREFIX_DELIVERED: 686 checkState( 687 current == State.INITIALIZED || current == State.UNINITIALIZED, 688 "%s -> %s", 689 current, 690 next); 691 break; 692 case ALL_MESSAGES_DELIVERED: 693 checkState(current == State.PREFIX_DELIVERED, "%s -> %s", current, next); 694 break; 695 case SUFFIX_DELIVERED: 696 checkState(current == State.ALL_MESSAGES_DELIVERED, "%s -> %s", current, next); 697 break; 698 case CLOSED: 699 break; 700 default: 701 throw new AssertionError(); 702 } 703 } 704 705 // ====================================== 706 // Message reassembly. 707 708 /** Part of an unconsumed message. */ 709 private static final class TransactionData { 710 @Nullable final InputStream stream; 711 @Nullable final byte[] block; 712 final int numBytes; 713 final boolean lastBlockOfMessage; 714 TransactionData(InputStream stream, byte[] block, int numBytes, boolean lastBlockOfMessage)715 TransactionData(InputStream stream, byte[] block, int numBytes, boolean lastBlockOfMessage) { 716 this.stream = stream; 717 this.block = block; 718 this.numBytes = numBytes; 719 this.lastBlockOfMessage = lastBlockOfMessage; 720 } 721 722 @Override toString()723 public String toString() { 724 return "TransactionData[" 725 + numBytes 726 + "b " 727 + (stream != null ? "stream" : "array") 728 + (lastBlockOfMessage ? "(last)]" : "]"); 729 } 730 } 731 } 732