1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.binder.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkState; 21 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; 22 import static java.lang.Math.max; 23 24 import android.os.Parcel; 25 import io.grpc.Deadline; 26 import io.grpc.Metadata; 27 import io.grpc.MethodDescriptor; 28 import io.grpc.Status; 29 import io.grpc.StatusException; 30 import io.grpc.internal.StatsTraceContext; 31 import java.io.IOException; 32 import java.io.InputStream; 33 import java.util.Queue; 34 import java.util.concurrent.ConcurrentLinkedQueue; 35 import java.util.concurrent.TimeUnit; 36 import javax.annotation.Nullable; 37 import javax.annotation.concurrent.GuardedBy; 38 39 /** 40 * Sends the set of outbound transactions for a single BinderStream (rpc). 41 * 42 * <p>Handles buffering internally for flow control, and splitting large messages into multiple 43 * transactions where necessary. 44 * 45 * <p>Also handles reporting to the {@link StatsTraceContext}. 46 * 47 * <p>A note on threading: All calls into this class are expected to hold this object as a lock. 48 * However, since calls from gRPC are serialized already, the only reason we need to care about 49 * threading is the onTransportReady() call (when flow-control unblocks us). 50 * 51 * <p>To reduce the cost of locking, BinderStream endeavors to make only a single call to this class 52 * for single-message calls (the most common). 53 * 54 * <p><b>IMPORTANT:</b> To avoid potential deadlocks, this class may only call unsynchronized 55 * methods of the BinderTransport class. 56 */ 57 abstract class Outbound { 58 59 private final BinderTransport transport; 60 private final int callId; 61 private final StatsTraceContext statsTraceContext; 62 63 enum State { 64 INITIAL, 65 PREFIX_SENT, 66 ALL_MESSAGES_SENT, 67 SUFFIX_SENT, 68 CLOSED, 69 } 70 71 /* 72 * Represents the state of data we've sent in binder transactions. 73 */ 74 @GuardedBy("this") 75 private State outboundState = State.INITIAL; // Represents what we've delivered. 76 77 // ---------------------------------- 78 // For reporting to StatsTraceContext. 79 /** Indicates we're ready to send the prefix. */ 80 private boolean prefixReady; 81 82 @Nullable private InputStream firstMessage; 83 84 @Nullable private Queue<InputStream> messageQueue; 85 86 /** 87 * Indicates we have everything ready to send the suffix. This implies we have all outgoing 88 * messages, and any additional data which needs to be send after the last message. (e.g. 89 * trailers). 90 */ 91 private boolean suffixReady; 92 93 /** 94 * The index of the next transaction we'll send, allowing the receiver to re-assemble out-of-order 95 * messages. 96 */ 97 @GuardedBy("this") 98 private int transactionIndex; 99 100 // ---------------------------------- 101 // For reporting to StatsTraceContext. 102 private int numDeliveredMessages; 103 private int messageSize; 104 Outbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext)105 private Outbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext) { 106 this.transport = transport; 107 this.callId = callId; 108 this.statsTraceContext = statsTraceContext; 109 } 110 getStatsTraceContext()111 final StatsTraceContext getStatsTraceContext() { 112 return statsTraceContext; 113 } 114 115 /** Call to add a message to be delivered. Implies onPrefixReady(). */ 116 @GuardedBy("this") addMessage(InputStream message)117 final void addMessage(InputStream message) throws StatusException { 118 onPrefixReady(); // This is implied. 119 if (messageQueue != null) { 120 messageQueue.add(message); 121 } else if (firstMessage == null) { 122 firstMessage = message; 123 } else { 124 messageQueue = new ConcurrentLinkedQueue<>(); 125 messageQueue.add(message); 126 } 127 } 128 129 @GuardedBy("this") onPrefixReady()130 protected final void onPrefixReady() { 131 this.prefixReady = true; 132 } 133 134 @GuardedBy("this") onSuffixReady()135 protected final void onSuffixReady() { 136 this.suffixReady = true; 137 } 138 139 // ===================== 140 // Updates to delivery. 141 @GuardedBy("this") onOutboundState(State outboundState)142 private void onOutboundState(State outboundState) { 143 checkTransition(this.outboundState, outboundState); 144 this.outboundState = outboundState; 145 } 146 147 // =================== 148 // Internals. 149 @GuardedBy("this") messageAvailable()150 protected final boolean messageAvailable() { 151 if (messageQueue != null) { 152 return !messageQueue.isEmpty(); 153 } else if (firstMessage != null) { 154 return numDeliveredMessages == 0; 155 } else { 156 return false; 157 } 158 } 159 160 @Nullable 161 @GuardedBy("this") peekNextMessage()162 private final InputStream peekNextMessage() { 163 if (numDeliveredMessages == 0) { 164 return firstMessage; 165 } else if (messageQueue != null) { 166 return messageQueue.peek(); 167 } 168 return null; 169 } 170 171 @GuardedBy("this") canSend()172 private final boolean canSend() { 173 switch (outboundState) { 174 case INITIAL: 175 if (!prefixReady) { 176 return false; 177 } 178 break; 179 case PREFIX_SENT: 180 // We can only send something if we have messages or the suffix. 181 // Note that if we have the suffix but no messages in this state, it means we've been closed 182 // early. 183 if (!messageAvailable() && !suffixReady) { 184 return false; 185 } 186 break; 187 case ALL_MESSAGES_SENT: 188 if (!suffixReady) { 189 return false; 190 } 191 break; 192 default: 193 return false; 194 } 195 return isReady(); 196 } 197 isReady()198 final boolean isReady() { 199 return transport.isReady(); 200 } 201 202 @GuardedBy("this") onTransportReady()203 final void onTransportReady() throws StatusException { 204 // The transport has become ready, attempt sending. 205 send(); 206 } 207 208 @GuardedBy("this") send()209 final void send() throws StatusException { 210 while (canSend()) { 211 try { 212 sendInternal(); 213 } catch (StatusException se) { 214 // Ensure we don't send anything else and rethrow. 215 onOutboundState(State.CLOSED); 216 throw se; 217 } 218 } 219 } 220 221 @GuardedBy("this") 222 @SuppressWarnings("fallthrough") sendInternal()223 protected final void sendInternal() throws StatusException { 224 try (ParcelHolder parcel = ParcelHolder.obtain()) { 225 int flags = 0; 226 parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below. 227 parcel.get().writeInt(transactionIndex++); 228 switch (outboundState) { 229 case INITIAL: 230 flags |= TransactionUtils.FLAG_PREFIX; 231 flags |= writePrefix(parcel.get()); 232 onOutboundState(State.PREFIX_SENT); 233 if (!messageAvailable() && !suffixReady) { 234 break; 235 } 236 // Fall-through. 237 case PREFIX_SENT: 238 InputStream messageStream = peekNextMessage(); 239 if (messageStream != null) { 240 flags |= TransactionUtils.FLAG_MESSAGE_DATA; 241 flags |= writeMessageData(parcel.get(), messageStream); 242 } else { 243 checkState(suffixReady); 244 } 245 if (suffixReady && !messageAvailable()) { 246 onOutboundState(State.ALL_MESSAGES_SENT); 247 } else { 248 // There's still more message data to deliver, break out. 249 break; 250 } 251 // Fall-through. 252 case ALL_MESSAGES_SENT: 253 flags |= TransactionUtils.FLAG_SUFFIX; 254 flags |= writeSuffix(parcel.get()); 255 onOutboundState(State.SUFFIX_SENT); 256 break; 257 default: 258 throw new AssertionError(); 259 } 260 TransactionUtils.fillInFlags(parcel.get(), flags); 261 int dataSize = parcel.get().dataSize(); 262 transport.sendTransaction(callId, parcel); 263 statsTraceContext.outboundWireSize(dataSize); 264 statsTraceContext.outboundUncompressedSize(dataSize); 265 } catch (IOException e) { 266 throw Status.INTERNAL.withCause(e).asException(); 267 } 268 } 269 unregister()270 protected final void unregister() { 271 transport.unregisterCall(callId); 272 } 273 274 @Override toString()275 public synchronized String toString() { 276 return getClass().getSimpleName() 277 + "[S=" 278 + outboundState 279 + "/NDM=" 280 + numDeliveredMessages 281 + "]"; 282 } 283 284 /** 285 * Write prefix data to the given {@link Parcel}. 286 * 287 * @param parcel the transaction parcel to write to. 288 * @return any additional flags to be set on the transaction. 289 */ 290 @GuardedBy("this") writePrefix(Parcel parcel)291 protected abstract int writePrefix(Parcel parcel) throws IOException, StatusException; 292 293 /** 294 * Write suffix data to the given {@link Parcel}. 295 * 296 * @param parcel the transaction parcel to write to. 297 * @return any additional flags to be set on the transaction. 298 */ 299 @GuardedBy("this") writeSuffix(Parcel parcel)300 protected abstract int writeSuffix(Parcel parcel) throws IOException, StatusException; 301 302 @GuardedBy("this") writeMessageData(Parcel parcel, InputStream stream)303 private final int writeMessageData(Parcel parcel, InputStream stream) throws IOException { 304 int flags = 0; 305 boolean dataRemaining = false; 306 if (stream instanceof ParcelableInputStream) { 307 flags |= TransactionUtils.FLAG_MESSAGE_DATA_IS_PARCELABLE; 308 messageSize = ((ParcelableInputStream) stream).writeToParcel(parcel); 309 } else { 310 byte[] block = BlockPool.acquireBlock(); 311 try { 312 int size = stream.read(block); 313 if (size <= 0) { 314 parcel.writeInt(0); 315 } else { 316 parcel.writeInt(size); 317 parcel.writeByteArray(block, 0, size); 318 messageSize += size; 319 if (size == block.length) { 320 flags |= TransactionUtils.FLAG_MESSAGE_DATA_IS_PARTIAL; 321 dataRemaining = true; 322 } 323 } 324 } finally { 325 BlockPool.releaseBlock(block); 326 } 327 } 328 if (!dataRemaining) { 329 stream.close(); 330 int index = numDeliveredMessages++; 331 if (index > 0) { 332 checkNotNull(messageQueue).poll(); 333 } 334 statsTraceContext.outboundMessage(index); 335 statsTraceContext.outboundMessageSent(index, messageSize, messageSize); 336 messageSize = 0; 337 } 338 return flags; 339 } 340 341 // ====================================== 342 // Client-side outbound transactions. 343 static final class ClientOutbound extends Outbound { 344 345 private final MethodDescriptor<?, ?> method; 346 private final Metadata headers; 347 private final StatsTraceContext statsTraceContext; 348 ClientOutbound( BinderTransport transport, int callId, MethodDescriptor<?, ?> method, Metadata headers, StatsTraceContext statsTraceContext)349 ClientOutbound( 350 BinderTransport transport, 351 int callId, 352 MethodDescriptor<?, ?> method, 353 Metadata headers, 354 StatsTraceContext statsTraceContext) { 355 super(transport, callId, statsTraceContext); 356 this.method = method; 357 this.headers = headers; 358 this.statsTraceContext = statsTraceContext; 359 } 360 361 @Override 362 @GuardedBy("this") writePrefix(Parcel parcel)363 protected int writePrefix(Parcel parcel) throws IOException, StatusException { 364 parcel.writeString(method.getFullMethodName()); 365 MetadataHelper.writeMetadata(parcel, headers); 366 statsTraceContext.clientOutboundHeaders(); 367 if (method.getType().serverSendsOneMessage()) { 368 return TransactionUtils.FLAG_EXPECT_SINGLE_MESSAGE; 369 } 370 return 0; 371 } 372 373 // Implies onPrefixReady() and onSuffixReady(). 374 @GuardedBy("this") sendSingleMessageAndHalfClose(@ullable InputStream singleMessage)375 void sendSingleMessageAndHalfClose(@Nullable InputStream singleMessage) throws StatusException { 376 if (singleMessage != null) { 377 addMessage(singleMessage); 378 } 379 onSuffixReady(); 380 send(); 381 } 382 383 @GuardedBy("this") sendHalfClose()384 void sendHalfClose() throws StatusException { 385 onSuffixReady(); 386 send(); 387 } 388 389 @Override 390 @GuardedBy("this") writeSuffix(Parcel parcel)391 protected int writeSuffix(Parcel parcel) throws IOException { 392 // Client doesn't include anything in the suffix. 393 return 0; 394 } 395 396 // Must not be called after onPrefixReady() (explicitly or via another method that implies it). 397 @GuardedBy("this") setDeadline(Deadline deadline)398 void setDeadline(Deadline deadline) { 399 headers.discardAll(TIMEOUT_KEY); 400 long effectiveTimeoutNanos = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS)); 401 headers.put(TIMEOUT_KEY, effectiveTimeoutNanos); 402 } 403 } 404 405 // ====================================== 406 // Server-side outbound transactions. 407 static final class ServerOutbound extends Outbound { 408 @GuardedBy("this") 409 @Nullable 410 private Metadata headers; 411 412 @GuardedBy("this") 413 @Nullable 414 private Status closeStatus; 415 416 @GuardedBy("this") 417 @Nullable 418 private Metadata trailers; 419 ServerOutbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext)420 ServerOutbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext) { 421 super(transport, callId, statsTraceContext); 422 } 423 424 @GuardedBy("this") sendHeaders(Metadata headers)425 void sendHeaders(Metadata headers) throws StatusException { 426 this.headers = headers; 427 onPrefixReady(); 428 send(); 429 } 430 431 @Override 432 @GuardedBy("this") writePrefix(Parcel parcel)433 protected int writePrefix(Parcel parcel) throws IOException, StatusException { 434 MetadataHelper.writeMetadata(parcel, headers); 435 return 0; 436 } 437 438 @GuardedBy("this") sendSingleMessageAndClose( @ullable Metadata pendingHeaders, @Nullable InputStream pendingSingleMessage, Status closeStatus, Metadata trailers)439 void sendSingleMessageAndClose( 440 @Nullable Metadata pendingHeaders, 441 @Nullable InputStream pendingSingleMessage, 442 Status closeStatus, 443 Metadata trailers) 444 throws StatusException { 445 if (this.closeStatus != null) { 446 return; 447 } 448 if (pendingHeaders != null) { 449 this.headers = pendingHeaders; 450 } 451 onPrefixReady(); 452 if (pendingSingleMessage != null) { 453 addMessage(pendingSingleMessage); 454 } 455 checkState(this.trailers == null); 456 this.closeStatus = closeStatus; 457 this.trailers = trailers; 458 onSuffixReady(); 459 send(); 460 } 461 462 @GuardedBy("this") sendClose(Status closeStatus, Metadata trailers)463 void sendClose(Status closeStatus, Metadata trailers) throws StatusException { 464 if (this.closeStatus != null) { 465 return; 466 } 467 checkState(this.trailers == null); 468 this.closeStatus = closeStatus; 469 this.trailers = trailers; 470 onPrefixReady(); 471 onSuffixReady(); 472 send(); 473 } 474 475 @Override 476 @GuardedBy("this") writeSuffix(Parcel parcel)477 protected int writeSuffix(Parcel parcel) throws IOException, StatusException { 478 int flags = TransactionUtils.writeStatus(parcel, closeStatus); 479 MetadataHelper.writeMetadata(parcel, trailers); 480 // TODO: This is an ugly place for this side-effect. 481 unregister(); 482 return flags; 483 } 484 } 485 486 // ====================================== 487 // Helper methods. checkTransition(State current, State next)488 private static void checkTransition(State current, State next) { 489 switch (next) { 490 case PREFIX_SENT: 491 checkState(current == State.INITIAL); 492 break; 493 case ALL_MESSAGES_SENT: 494 checkState(current == State.PREFIX_SENT); 495 break; 496 case SUFFIX_SENT: 497 checkState(current == State.ALL_MESSAGES_SENT); 498 break; 499 case CLOSED: // hah. 500 break; 501 default: 502 throw new AssertionError(); 503 } 504 } 505 } 506