• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.binder.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
22 import static java.lang.Math.max;
23 
24 import android.os.Parcel;
25 import io.grpc.Deadline;
26 import io.grpc.Metadata;
27 import io.grpc.MethodDescriptor;
28 import io.grpc.Status;
29 import io.grpc.StatusException;
30 import io.grpc.internal.StatsTraceContext;
31 import java.io.IOException;
32 import java.io.InputStream;
33 import java.util.Queue;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.TimeUnit;
36 import javax.annotation.Nullable;
37 import javax.annotation.concurrent.GuardedBy;
38 
39 /**
40  * Sends the set of outbound transactions for a single BinderStream (rpc).
41  *
42  * <p>Handles buffering internally for flow control, and splitting large messages into multiple
43  * transactions where necessary.
44  *
45  * <p>Also handles reporting to the {@link StatsTraceContext}.
46  *
47  * <p>A note on threading: All calls into this class are expected to hold this object as a lock.
48  * However, since calls from gRPC are serialized already, the only reason we need to care about
49  * threading is the onTransportReady() call (when flow-control unblocks us).
50  *
51  * <p>To reduce the cost of locking, BinderStream endeavors to make only a single call to this class
52  * for single-message calls (the most common).
53  *
54  * <p><b>IMPORTANT:</b> To avoid potential deadlocks, this class may only call unsynchronized
55  * methods of the BinderTransport class.
56  */
57 abstract class Outbound {
58 
59   private final BinderTransport transport;
60   private final int callId;
61   private final StatsTraceContext statsTraceContext;
62 
63   enum State {
64     INITIAL,
65     PREFIX_SENT,
66     ALL_MESSAGES_SENT,
67     SUFFIX_SENT,
68     CLOSED,
69   }
70 
71   /*
72    * Represents the state of data we've sent in binder transactions.
73    */
74   @GuardedBy("this")
75   private State outboundState = State.INITIAL; // Represents what we've delivered.
76 
77   // ----------------------------------
78   // For reporting to StatsTraceContext.
79   /** Indicates we're ready to send the prefix. */
80   private boolean prefixReady;
81 
82   @Nullable private InputStream firstMessage;
83 
84   @Nullable private Queue<InputStream> messageQueue;
85 
86   /**
87    * Indicates we have everything ready to send the suffix. This implies we have all outgoing
88    * messages, and any additional data which needs to be send after the last message. (e.g.
89    * trailers).
90    */
91   private boolean suffixReady;
92 
93   /**
94    * The index of the next transaction we'll send, allowing the receiver to re-assemble out-of-order
95    * messages.
96    */
97   @GuardedBy("this")
98   private int transactionIndex;
99 
100   // ----------------------------------
101   // For reporting to StatsTraceContext.
102   private int numDeliveredMessages;
103   private int messageSize;
104 
Outbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext)105   private Outbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext) {
106     this.transport = transport;
107     this.callId = callId;
108     this.statsTraceContext = statsTraceContext;
109   }
110 
getStatsTraceContext()111   final StatsTraceContext getStatsTraceContext() {
112     return statsTraceContext;
113   }
114 
115   /** Call to add a message to be delivered. Implies onPrefixReady(). */
116   @GuardedBy("this")
addMessage(InputStream message)117   final void addMessage(InputStream message) throws StatusException {
118     onPrefixReady(); // This is implied.
119     if (messageQueue != null) {
120       messageQueue.add(message);
121     } else if (firstMessage == null) {
122       firstMessage = message;
123     } else {
124       messageQueue = new ConcurrentLinkedQueue<>();
125       messageQueue.add(message);
126     }
127   }
128 
129   @GuardedBy("this")
onPrefixReady()130   protected final void onPrefixReady() {
131     this.prefixReady = true;
132   }
133 
134   @GuardedBy("this")
onSuffixReady()135   protected final void onSuffixReady() {
136     this.suffixReady = true;
137   }
138 
139   // =====================
140   // Updates to delivery.
141   @GuardedBy("this")
onOutboundState(State outboundState)142   private void onOutboundState(State outboundState) {
143     checkTransition(this.outboundState, outboundState);
144     this.outboundState = outboundState;
145   }
146 
147   // ===================
148   // Internals.
149   @GuardedBy("this")
messageAvailable()150   protected final boolean messageAvailable() {
151     if (messageQueue != null) {
152       return !messageQueue.isEmpty();
153     } else if (firstMessage != null) {
154       return numDeliveredMessages == 0;
155     } else {
156       return false;
157     }
158   }
159 
160   @Nullable
161   @GuardedBy("this")
peekNextMessage()162   private final InputStream peekNextMessage() {
163     if (numDeliveredMessages == 0) {
164       return firstMessage;
165     } else if (messageQueue != null) {
166       return messageQueue.peek();
167     }
168     return null;
169   }
170 
171   @GuardedBy("this")
canSend()172   private final boolean canSend() {
173     switch (outboundState) {
174       case INITIAL:
175         if (!prefixReady) {
176           return false;
177         }
178         break;
179       case PREFIX_SENT:
180         // We can only send something if we have messages or the suffix.
181         // Note that if we have the suffix but no messages in this state, it means we've been closed
182         // early.
183         if (!messageAvailable() && !suffixReady) {
184           return false;
185         }
186         break;
187       case ALL_MESSAGES_SENT:
188         if (!suffixReady) {
189           return false;
190         }
191         break;
192       default:
193         return false;
194     }
195     return isReady();
196   }
197 
isReady()198   final boolean isReady() {
199     return transport.isReady();
200   }
201 
202   @GuardedBy("this")
onTransportReady()203   final void onTransportReady() throws StatusException {
204     // The transport has become ready, attempt sending.
205     send();
206   }
207 
208   @GuardedBy("this")
send()209   final void send() throws StatusException {
210     while (canSend()) {
211       try {
212         sendInternal();
213       } catch (StatusException se) {
214         // Ensure we don't send anything else and rethrow.
215         onOutboundState(State.CLOSED);
216         throw se;
217       }
218     }
219   }
220 
221   @GuardedBy("this")
222   @SuppressWarnings("fallthrough")
sendInternal()223   protected final void sendInternal() throws StatusException {
224     try (ParcelHolder parcel = ParcelHolder.obtain()) {
225       int flags = 0;
226       parcel.get().writeInt(0); // Placeholder for flags. Will be filled in below.
227       parcel.get().writeInt(transactionIndex++);
228       switch (outboundState) {
229         case INITIAL:
230           flags |= TransactionUtils.FLAG_PREFIX;
231           flags |= writePrefix(parcel.get());
232           onOutboundState(State.PREFIX_SENT);
233           if (!messageAvailable() && !suffixReady) {
234             break;
235           }
236           // Fall-through.
237         case PREFIX_SENT:
238           InputStream messageStream = peekNextMessage();
239           if (messageStream != null) {
240             flags |= TransactionUtils.FLAG_MESSAGE_DATA;
241             flags |= writeMessageData(parcel.get(), messageStream);
242           } else {
243             checkState(suffixReady);
244           }
245           if (suffixReady && !messageAvailable()) {
246             onOutboundState(State.ALL_MESSAGES_SENT);
247           } else {
248             // There's still more message data to deliver, break out.
249             break;
250           }
251           // Fall-through.
252         case ALL_MESSAGES_SENT:
253           flags |= TransactionUtils.FLAG_SUFFIX;
254           flags |= writeSuffix(parcel.get());
255           onOutboundState(State.SUFFIX_SENT);
256           break;
257         default:
258           throw new AssertionError();
259       }
260       TransactionUtils.fillInFlags(parcel.get(), flags);
261       int dataSize = parcel.get().dataSize();
262       transport.sendTransaction(callId, parcel);
263       statsTraceContext.outboundWireSize(dataSize);
264       statsTraceContext.outboundUncompressedSize(dataSize);
265     } catch (IOException e) {
266       throw Status.INTERNAL.withCause(e).asException();
267     }
268   }
269 
unregister()270   protected final void unregister() {
271     transport.unregisterCall(callId);
272   }
273 
274   @Override
toString()275   public synchronized String toString() {
276     return getClass().getSimpleName()
277         + "[S="
278         + outboundState
279         + "/NDM="
280         + numDeliveredMessages
281         + "]";
282   }
283 
284   /**
285    * Write prefix data to the given {@link Parcel}.
286    *
287    * @param parcel the transaction parcel to write to.
288    * @return any additional flags to be set on the transaction.
289    */
290   @GuardedBy("this")
writePrefix(Parcel parcel)291   protected abstract int writePrefix(Parcel parcel) throws IOException, StatusException;
292 
293   /**
294    * Write suffix data to the given {@link Parcel}.
295    *
296    * @param parcel the transaction parcel to write to.
297    * @return any additional flags to be set on the transaction.
298    */
299   @GuardedBy("this")
writeSuffix(Parcel parcel)300   protected abstract int writeSuffix(Parcel parcel) throws IOException, StatusException;
301 
302   @GuardedBy("this")
writeMessageData(Parcel parcel, InputStream stream)303   private final int writeMessageData(Parcel parcel, InputStream stream) throws IOException {
304     int flags = 0;
305     boolean dataRemaining = false;
306     if (stream instanceof ParcelableInputStream) {
307       flags |= TransactionUtils.FLAG_MESSAGE_DATA_IS_PARCELABLE;
308       messageSize = ((ParcelableInputStream) stream).writeToParcel(parcel);
309     } else {
310       byte[] block = BlockPool.acquireBlock();
311       try {
312         int size = stream.read(block);
313         if (size <= 0) {
314           parcel.writeInt(0);
315         } else {
316           parcel.writeInt(size);
317           parcel.writeByteArray(block, 0, size);
318           messageSize += size;
319           if (size == block.length) {
320             flags |= TransactionUtils.FLAG_MESSAGE_DATA_IS_PARTIAL;
321             dataRemaining = true;
322           }
323         }
324       } finally {
325         BlockPool.releaseBlock(block);
326       }
327     }
328     if (!dataRemaining) {
329       stream.close();
330       int index = numDeliveredMessages++;
331       if (index > 0) {
332         checkNotNull(messageQueue).poll();
333       }
334       statsTraceContext.outboundMessage(index);
335       statsTraceContext.outboundMessageSent(index, messageSize, messageSize);
336       messageSize = 0;
337     }
338     return flags;
339   }
340 
341   // ======================================
342   // Client-side outbound transactions.
343   static final class ClientOutbound extends Outbound {
344 
345     private final MethodDescriptor<?, ?> method;
346     private final Metadata headers;
347     private final StatsTraceContext statsTraceContext;
348 
ClientOutbound( BinderTransport transport, int callId, MethodDescriptor<?, ?> method, Metadata headers, StatsTraceContext statsTraceContext)349     ClientOutbound(
350         BinderTransport transport,
351         int callId,
352         MethodDescriptor<?, ?> method,
353         Metadata headers,
354         StatsTraceContext statsTraceContext) {
355       super(transport, callId, statsTraceContext);
356       this.method = method;
357       this.headers = headers;
358       this.statsTraceContext = statsTraceContext;
359     }
360 
361     @Override
362     @GuardedBy("this")
writePrefix(Parcel parcel)363     protected int writePrefix(Parcel parcel) throws IOException, StatusException {
364       parcel.writeString(method.getFullMethodName());
365       MetadataHelper.writeMetadata(parcel, headers);
366       statsTraceContext.clientOutboundHeaders();
367       if (method.getType().serverSendsOneMessage()) {
368         return TransactionUtils.FLAG_EXPECT_SINGLE_MESSAGE;
369       }
370       return 0;
371     }
372 
373     // Implies onPrefixReady() and onSuffixReady().
374     @GuardedBy("this")
sendSingleMessageAndHalfClose(@ullable InputStream singleMessage)375     void sendSingleMessageAndHalfClose(@Nullable InputStream singleMessage) throws StatusException {
376       if (singleMessage != null) {
377         addMessage(singleMessage);
378       }
379       onSuffixReady();
380       send();
381     }
382 
383     @GuardedBy("this")
sendHalfClose()384     void sendHalfClose() throws StatusException {
385       onSuffixReady();
386       send();
387     }
388 
389     @Override
390     @GuardedBy("this")
writeSuffix(Parcel parcel)391     protected int writeSuffix(Parcel parcel) throws IOException {
392       // Client doesn't include anything in the suffix.
393       return 0;
394     }
395 
396     // Must not be called after onPrefixReady() (explicitly or via another method that implies it).
397     @GuardedBy("this")
setDeadline(Deadline deadline)398     void setDeadline(Deadline deadline) {
399       headers.discardAll(TIMEOUT_KEY);
400       long effectiveTimeoutNanos = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
401       headers.put(TIMEOUT_KEY, effectiveTimeoutNanos);
402     }
403   }
404 
405   // ======================================
406   // Server-side outbound transactions.
407   static final class ServerOutbound extends Outbound {
408     @GuardedBy("this")
409     @Nullable
410     private Metadata headers;
411 
412     @GuardedBy("this")
413     @Nullable
414     private Status closeStatus;
415 
416     @GuardedBy("this")
417     @Nullable
418     private Metadata trailers;
419 
ServerOutbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext)420     ServerOutbound(BinderTransport transport, int callId, StatsTraceContext statsTraceContext) {
421       super(transport, callId, statsTraceContext);
422     }
423 
424     @GuardedBy("this")
sendHeaders(Metadata headers)425     void sendHeaders(Metadata headers) throws StatusException {
426       this.headers = headers;
427       onPrefixReady();
428       send();
429     }
430 
431     @Override
432     @GuardedBy("this")
writePrefix(Parcel parcel)433     protected int writePrefix(Parcel parcel) throws IOException, StatusException {
434       MetadataHelper.writeMetadata(parcel, headers);
435       return 0;
436     }
437 
438     @GuardedBy("this")
sendSingleMessageAndClose( @ullable Metadata pendingHeaders, @Nullable InputStream pendingSingleMessage, Status closeStatus, Metadata trailers)439     void sendSingleMessageAndClose(
440         @Nullable Metadata pendingHeaders,
441         @Nullable InputStream pendingSingleMessage,
442         Status closeStatus,
443         Metadata trailers)
444         throws StatusException {
445       if (this.closeStatus != null) {
446         return;
447       }
448       if (pendingHeaders != null) {
449         this.headers = pendingHeaders;
450       }
451       onPrefixReady();
452       if (pendingSingleMessage != null) {
453         addMessage(pendingSingleMessage);
454       }
455       checkState(this.trailers == null);
456       this.closeStatus = closeStatus;
457       this.trailers = trailers;
458       onSuffixReady();
459       send();
460     }
461 
462     @GuardedBy("this")
sendClose(Status closeStatus, Metadata trailers)463     void sendClose(Status closeStatus, Metadata trailers) throws StatusException {
464       if (this.closeStatus != null) {
465         return;
466       }
467       checkState(this.trailers == null);
468       this.closeStatus = closeStatus;
469       this.trailers = trailers;
470       onPrefixReady();
471       onSuffixReady();
472       send();
473     }
474 
475     @Override
476     @GuardedBy("this")
writeSuffix(Parcel parcel)477     protected int writeSuffix(Parcel parcel) throws IOException, StatusException {
478       int flags = TransactionUtils.writeStatus(parcel, closeStatus);
479       MetadataHelper.writeMetadata(parcel, trailers);
480       // TODO: This is an ugly place for this side-effect.
481       unregister();
482       return flags;
483     }
484   }
485 
486   // ======================================
487   // Helper methods.
checkTransition(State current, State next)488   private static void checkTransition(State current, State next) {
489     switch (next) {
490       case PREFIX_SENT:
491         checkState(current == State.INITIAL);
492         break;
493       case ALL_MESSAGES_SENT:
494         checkState(current == State.PREFIX_SENT);
495         break;
496       case SUFFIX_SENT:
497         checkState(current == State.ALL_MESSAGES_SENT);
498         break;
499       case CLOSED: // hah.
500         break;
501       default:
502         throw new AssertionError();
503     }
504   }
505 }
506