• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.binder.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import android.os.Parcel;
23 import io.grpc.Attributes;
24 import io.grpc.Metadata;
25 import io.grpc.Status;
26 import io.grpc.StatusException;
27 import io.grpc.binder.InboundParcelablePolicy;
28 import io.grpc.internal.ClientStreamListener;
29 import io.grpc.internal.ClientStreamListener.RpcProgress;
30 import io.grpc.internal.ServerStream;
31 import io.grpc.internal.ServerStreamListener;
32 import io.grpc.internal.StatsTraceContext;
33 import io.grpc.internal.StreamListener;
34 import java.io.InputStream;
35 import java.util.ArrayList;
36 import javax.annotation.Nullable;
37 import javax.annotation.concurrent.GuardedBy;
38 
39 /**
40  * Handles incoming binder transactions for a single stream, turning those transactions into calls
41  * to the stream listener.
42  *
43  * <p>Out-of-order messages are reassembled into their correct order.
44  */
45 abstract class Inbound<L extends StreamListener> implements StreamListener.MessageProducer {
46 
47   protected final BinderTransport transport;
48   protected final Attributes attributes;
49   final int callId;
50 
51   // ==========================
52   // Values set when we're initialized.
53 
54   @Nullable
55   @GuardedBy("this")
56   protected Outbound outbound;
57 
58   @Nullable
59   @GuardedBy("this")
60   protected StatsTraceContext statsTraceContext;
61 
62   @Nullable
63   @GuardedBy("this")
64   protected L listener;
65 
66   // ==========================
67   // State of inbound data.
68 
69   @Nullable
70   @GuardedBy("this")
71   private InputStream firstMessage;
72 
73   @GuardedBy("this")
74   private int firstQueuedTransactionIndex;
75 
76   @GuardedBy("this")
77   private int nextCompleteMessageEnd;
78 
79   @Nullable
80   @GuardedBy("this")
81   private ArrayList<TransactionData> queuedTransactionData;
82 
83   @GuardedBy("this")
84   private boolean suffixAvailable;
85 
86   @GuardedBy("this")
87   private int suffixTransactionIndex;
88 
89   @GuardedBy("this")
90   private int inboundDataSize;
91 
92   // ==========================
93   // State of what we've delivered to gRPC.
94 
95   /**
96    * Each rpc transmits (or receives) a prefix (including headers and possibly a method name), the
97    * data of zero or more request (or response) messages, and a suffix (possibly including a close
98    * status and trailers).
99    *
100    * <p>This enum represents those stages, for both availability (what we've been given), and
101    * delivery what we've sent.
102    */
103   enum State {
104     // We aren't yet connected to a BinderStream instance and listener. Due to potentially
105     // out-of-order messages, a server-side instance can remain in this state for multiple
106     // transactions.
107     UNINITIALIZED,
108 
109     // We're attached to a BinderStream instance and we have a listener we can report to.
110     // On the client-side, this happens as soon as the start() method is called (almost
111     // immediately), and on the server side, this happens as soon as we receive the prefix
112     // (so we know which method is being called).
113     INITIALIZED,
114 
115     // We've delivered the prefix data to the listener. On the client side, this means we've
116     // delivered the response headers, and on the server side this state is effectively the same
117     // as INITIALIZED (since we initialize only by delivering the prefix).
118     PREFIX_DELIVERED,
119 
120     // All messages have been received, and delivered to the listener.
121     ALL_MESSAGES_DELIVERED,
122 
123     // We've delivered the suffix.
124     SUFFIX_DELIVERED,
125 
126     // The stream is closed.
127     CLOSED
128   }
129 
130   /*
131    * Represents which data we've delivered to the gRPC listener.
132    */
133   @GuardedBy("this")
134   private State deliveryState = State.UNINITIALIZED;
135 
136   @GuardedBy("this")
137   private int numReceivedMessages;
138 
139   @GuardedBy("this")
140   private int numRequestedMessages;
141 
142   @GuardedBy("this")
143   private boolean delivering;
144 
145   @GuardedBy("this")
146   private boolean producingMessages;
147 
Inbound(BinderTransport transport, Attributes attributes, int callId)148   private Inbound(BinderTransport transport, Attributes attributes, int callId) {
149     this.transport = transport;
150     this.attributes = attributes;
151     this.callId = callId;
152   }
153 
154   @GuardedBy("this")
init(Outbound outbound, L listener)155   final void init(Outbound outbound, L listener) {
156     this.outbound = outbound;
157     this.statsTraceContext = outbound.getStatsTraceContext();
158     this.listener = listener;
159     if (!isClosed()) {
160       onDeliveryState(State.INITIALIZED);
161     }
162   }
163 
unregister()164   final void unregister() {
165     transport.unregisterInbound(this);
166   }
167 
countsForInUse()168   boolean countsForInUse() {
169     return false;
170   }
171 
172   // =====================
173   // Updates to delivery.
174 
175   @GuardedBy("this")
onDeliveryState(State deliveryState)176   protected final void onDeliveryState(State deliveryState) {
177     checkTransition(this.deliveryState, deliveryState);
178     this.deliveryState = deliveryState;
179   }
180 
181   @GuardedBy("this")
isClosed()182   protected final boolean isClosed() {
183     return deliveryState == State.CLOSED;
184   }
185 
186   @GuardedBy("this")
messageAvailable()187   private final boolean messageAvailable() {
188     return firstMessage != null || nextCompleteMessageEnd > 0;
189   }
190 
191   @GuardedBy("this")
receivedAllTransactions()192   private boolean receivedAllTransactions() {
193     return suffixAvailable && firstQueuedTransactionIndex >= suffixTransactionIndex;
194   }
195 
196   // ===================
197   // Internals.
198 
199   @GuardedBy("this")
deliver()200   final void deliver() {
201     if (delivering) {
202       // Don't re-enter.
203       return;
204     }
205     delivering = true;
206     while (canDeliver()) {
207       deliverInternal();
208     }
209     delivering = false;
210   }
211 
212   @GuardedBy("this")
canDeliver()213   private final boolean canDeliver() {
214     switch (deliveryState) {
215       case PREFIX_DELIVERED:
216         if (listener != null) {
217           if (producingMessages) {
218             // We're waiting for the listener to consume messages. Nothing to do.
219             return false;
220           } else if (messageAvailable()) {
221             // There's a message. We can deliver if we've been asked for messages, and we haven't
222             // already given the listener a MessageProducer.
223             return numRequestedMessages != 0;
224           } else {
225             // There are no messages available. Return true if that's the last of them, because we
226             // can send the suffix.
227             return receivedAllTransactions();
228           }
229         }
230         return false;
231       case ALL_MESSAGES_DELIVERED:
232         return listener != null && suffixAvailable;
233       default:
234         return false;
235     }
236   }
237 
238   @GuardedBy("this")
239   @SuppressWarnings("fallthrough")
deliverInternal()240   private final void deliverInternal() {
241     switch (deliveryState) {
242       case PREFIX_DELIVERED:
243         if (producingMessages) {
244           break;
245         } else if (messageAvailable()) {
246           producingMessages = true;
247           listener.messagesAvailable(this);
248           break;
249         } else if (!suffixAvailable) {
250           break;
251         }
252         onDeliveryState(State.ALL_MESSAGES_DELIVERED);
253         // Fall-through.
254       case ALL_MESSAGES_DELIVERED:
255         if (suffixAvailable) {
256           onDeliveryState(State.SUFFIX_DELIVERED);
257           deliverSuffix();
258         }
259         break;
260       default:
261         throw new AssertionError();
262     }
263   }
264 
265   /** Deliver the suffix to gRPC. */
deliverSuffix()266   protected abstract void deliverSuffix();
267 
268   @GuardedBy("this")
closeOnCancel(Status status)269   final void closeOnCancel(Status status) {
270     closeAbnormal(Status.CANCELLED, status, false);
271   }
272 
273   @GuardedBy("this")
closeOutOfBand(Status status)274   private final void closeOutOfBand(Status status) {
275     closeAbnormal(status, status, true);
276   }
277 
278   @GuardedBy("this")
closeAbnormal(Status status)279   final void closeAbnormal(Status status) {
280     closeAbnormal(status, status, false);
281   }
282 
283   @GuardedBy("this")
closeAbnormal( Status outboundStatus, Status internalStatus, boolean isOobFromRemote)284   private final void closeAbnormal(
285       Status outboundStatus, Status internalStatus, boolean isOobFromRemote) {
286     if (!isClosed()) {
287       boolean wasInitialized = (deliveryState != State.UNINITIALIZED);
288       onDeliveryState(State.CLOSED);
289       if (wasInitialized) {
290         statsTraceContext.streamClosed(internalStatus);
291       }
292       if (!isOobFromRemote) {
293         transport.sendOutOfBandClose(callId, outboundStatus);
294       }
295       if (wasInitialized) {
296         deliverCloseAbnormal(internalStatus);
297       }
298       unregister();
299     }
300   }
301 
302   @GuardedBy("this")
deliverCloseAbnormal(Status status)303   protected abstract void deliverCloseAbnormal(Status status);
304 
onTransportReady()305   final void onTransportReady() {
306     // Report transport readiness to the listener, and the outbound data.
307     Outbound outbound = null;
308     StreamListener listener = null;
309     synchronized (this) {
310       outbound = this.outbound;
311       listener = this.listener;
312     }
313     if (listener != null) {
314       listener.onReady();
315     }
316     if (outbound != null) {
317       try {
318         synchronized (outbound) {
319           outbound.onTransportReady();
320         }
321       } catch (StatusException se) {
322         synchronized (this) {
323           closeAbnormal(se.getStatus());
324         }
325       }
326     }
327   }
328 
329   @GuardedBy("this")
requestMessages(int num)330   public void requestMessages(int num) {
331     numRequestedMessages += num;
332     deliver();
333   }
334 
handleTransaction(Parcel parcel)335   final synchronized void handleTransaction(Parcel parcel) {
336     if (isClosed()) {
337       return;
338     }
339     try {
340       int flags = parcel.readInt();
341       if (TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_OUT_OF_BAND_CLOSE)) {
342         closeOutOfBand(TransactionUtils.readStatus(flags, parcel));
343         return;
344       }
345       int index = parcel.readInt();
346       boolean hasPrefix = TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_PREFIX);
347       boolean hasMessageData =
348           TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_MESSAGE_DATA);
349       boolean hasSuffix = TransactionUtils.hasFlag(flags, TransactionUtils.FLAG_SUFFIX);
350       if (hasPrefix) {
351         handlePrefix(flags, parcel);
352         onDeliveryState(State.PREFIX_DELIVERED);
353       }
354       if (hasMessageData) {
355         handleMessageData(flags, index, parcel);
356       }
357       if (hasSuffix) {
358         handleSuffix(flags, parcel);
359         suffixTransactionIndex = index;
360         suffixAvailable = true;
361       }
362       if (index == firstQueuedTransactionIndex) {
363         if (queuedTransactionData == null) {
364           // This message was in order, and we haven't needed to queue anything yet.
365           firstQueuedTransactionIndex += 1;
366         } else if (!hasMessageData && !hasSuffix) {
367           // The first transaction arrived, but it contained no message data.
368           queuedTransactionData.remove(0);
369           firstQueuedTransactionIndex += 1;
370         }
371       }
372       reportInboundSize(parcel.dataSize());
373       deliver();
374     } catch (StatusException se) {
375       closeAbnormal(se.getStatus());
376     }
377   }
378 
379   @GuardedBy("this")
handlePrefix(int flags, Parcel parcel)380   abstract void handlePrefix(int flags, Parcel parcel) throws StatusException;
381 
382   @GuardedBy("this")
handleSuffix(int flags, Parcel parcel)383   abstract void handleSuffix(int flags, Parcel parcel) throws StatusException;
384 
385   @GuardedBy("this")
handleMessageData(int flags, int index, Parcel parcel)386   private void handleMessageData(int flags, int index, Parcel parcel) throws StatusException {
387     InputStream stream = null;
388     byte[] block = null;
389     boolean lastBlockOfMessage = true;
390     int numBytes = 0;
391     if ((flags & TransactionUtils.FLAG_MESSAGE_DATA_IS_PARCELABLE) != 0) {
392       InboundParcelablePolicy policy = attributes.get(BinderTransport.INBOUND_PARCELABLE_POLICY);
393       if (policy == null || !policy.shouldAcceptParcelableMessages()) {
394         throw Status.PERMISSION_DENIED
395             .withDescription("Parcelable messages not allowed")
396             .asException();
397       }
398       int startPos = parcel.dataPosition();
399       stream = ParcelableInputStream.readFromParcel(parcel, getClass().getClassLoader());
400       numBytes = parcel.dataPosition() - startPos;
401     } else {
402       numBytes = parcel.readInt();
403       block = BlockPool.acquireBlock(numBytes);
404       if (numBytes > 0) {
405         parcel.readByteArray(block);
406       }
407       if ((flags & TransactionUtils.FLAG_MESSAGE_DATA_IS_PARTIAL) != 0) {
408         // Partial message. Ensure we have a message assembler.
409         lastBlockOfMessage = false;
410       }
411     }
412     if (queuedTransactionData == null) {
413       if (numReceivedMessages == 0 && lastBlockOfMessage && index == firstQueuedTransactionIndex) {
414         // Shortcut for when we receive a single message in one transaction.
415         checkState(firstMessage == null);
416         firstMessage = (stream != null) ? stream : new BlockInputStream(block);
417         reportInboundMessage(numBytes);
418         return;
419       }
420       queuedTransactionData = new ArrayList<>(16);
421     }
422     enqueueTransactionData(index, new TransactionData(stream, block, numBytes, lastBlockOfMessage));
423   }
424 
425   @GuardedBy("this")
enqueueTransactionData(int index, TransactionData data)426   private void enqueueTransactionData(int index, TransactionData data) {
427     int offset = index - firstQueuedTransactionIndex;
428     if (offset < queuedTransactionData.size()) {
429       queuedTransactionData.set(offset, data);
430       lookForCompleteMessage();
431     } else if (offset > queuedTransactionData.size()) {
432       do {
433         queuedTransactionData.add(null);
434       } while (offset > queuedTransactionData.size());
435       queuedTransactionData.add(data);
436     } else {
437       queuedTransactionData.add(data);
438       lookForCompleteMessage();
439     }
440   }
441 
442   @GuardedBy("this")
lookForCompleteMessage()443   private void lookForCompleteMessage() {
444     int numBytes = 0;
445     if (nextCompleteMessageEnd == 0) {
446       for (int i = 0; i < queuedTransactionData.size(); i++) {
447         TransactionData data = queuedTransactionData.get(i);
448         if (data == null) {
449           // Missing block.
450           return;
451         } else {
452           numBytes += data.numBytes;
453           if (data.lastBlockOfMessage) {
454             // Found a complete message.
455             nextCompleteMessageEnd = i + 1;
456             reportInboundMessage(numBytes);
457             return;
458           }
459         }
460       }
461     }
462   }
463 
464   @Override
465   @Nullable
next()466   public final synchronized InputStream next() {
467     InputStream stream = null;
468     if (firstMessage != null) {
469       stream = firstMessage;
470       firstMessage = null;
471     } else if (numRequestedMessages > 0 && messageAvailable()) {
472       stream = assembleNextMessage();
473     }
474     if (stream != null) {
475       numRequestedMessages -= 1;
476     } else {
477       producingMessages = false;
478       if (receivedAllTransactions()) {
479         // That's the last of the messages delivered.
480         if (!isClosed()) {
481           onDeliveryState(State.ALL_MESSAGES_DELIVERED);
482           deliver();
483         }
484       }
485     }
486     return stream;
487   }
488 
489   @GuardedBy("this")
assembleNextMessage()490   private InputStream assembleNextMessage() {
491     InputStream message;
492     int numBlocks = nextCompleteMessageEnd;
493     nextCompleteMessageEnd = 0;
494     int numBytes = 0;
495     if (numBlocks == 1) {
496       // Single block.
497       TransactionData data = queuedTransactionData.remove(0);
498       numBytes = data.numBytes;
499       if (data.stream != null) {
500         message = data.stream;
501       } else {
502         message = new BlockInputStream(data.block);
503       }
504     } else {
505       byte[][] blocks = new byte[numBlocks][];
506       for (int i = 0; i < numBlocks; i++) {
507         TransactionData data = queuedTransactionData.remove(0);
508         blocks[i] = checkNotNull(data.block);
509         numBytes += blocks[i].length;
510       }
511       message = new BlockInputStream(blocks, numBytes);
512     }
513     firstQueuedTransactionIndex += numBlocks;
514     lookForCompleteMessage();
515     return message;
516   }
517 
518   // ------------------------------------
519   // stats collection.
520 
521   @GuardedBy("this")
reportInboundSize(int size)522   private void reportInboundSize(int size) {
523     inboundDataSize += size;
524     if (statsTraceContext != null && inboundDataSize != 0) {
525       statsTraceContext.inboundWireSize(inboundDataSize);
526       statsTraceContext.inboundUncompressedSize(inboundDataSize);
527       inboundDataSize = 0;
528     }
529   }
530 
531   @GuardedBy("this")
reportInboundMessage(int numBytes)532   private void reportInboundMessage(int numBytes) {
533     checkNotNull(statsTraceContext);
534     statsTraceContext.inboundMessage(numReceivedMessages);
535     statsTraceContext.inboundMessageRead(numReceivedMessages, numBytes, numBytes);
536     numReceivedMessages += 1;
537   }
538 
539   @Override
toString()540   public synchronized String toString() {
541     return getClass().getSimpleName()
542         + "[SfxA="
543         + suffixAvailable
544         + "/De="
545         + deliveryState
546         + "/Msg="
547         + messageAvailable()
548         + "/Lis="
549         + (listener != null)
550         + "]";
551   }
552 
553   // ======================================
554   // Client-side inbound transactions.
555   static final class ClientInbound extends Inbound<ClientStreamListener> {
556 
557     private final boolean countsForInUse;
558 
559     @Nullable
560     @GuardedBy("this")
561     private Status closeStatus;
562 
563     @Nullable
564     @GuardedBy("this")
565     private Metadata trailers;
566 
ClientInbound( BinderTransport transport, Attributes attributes, int callId, boolean countsForInUse)567     ClientInbound(
568         BinderTransport transport, Attributes attributes, int callId, boolean countsForInUse) {
569       super(transport, attributes, callId);
570       this.countsForInUse = countsForInUse;
571     }
572 
573     @Override
countsForInUse()574     boolean countsForInUse() {
575       return countsForInUse;
576     }
577 
578     @Override
579     @GuardedBy("this")
handlePrefix(int flags, Parcel parcel)580     protected void handlePrefix(int flags, Parcel parcel) throws StatusException {
581       Metadata headers = MetadataHelper.readMetadata(parcel, attributes);
582       statsTraceContext.clientInboundHeaders();
583       listener.headersRead(headers);
584     }
585 
586     @Override
587     @GuardedBy("this")
handleSuffix(int flags, Parcel parcel)588     protected void handleSuffix(int flags, Parcel parcel) throws StatusException {
589       closeStatus = TransactionUtils.readStatus(flags, parcel);
590       trailers = MetadataHelper.readMetadata(parcel, attributes);
591     }
592 
593     @Override
594     @GuardedBy("this")
deliverSuffix()595     protected void deliverSuffix() {
596       statsTraceContext.clientInboundTrailers(trailers);
597       statsTraceContext.streamClosed(closeStatus);
598       onDeliveryState(State.CLOSED);
599       listener.closed(closeStatus, RpcProgress.PROCESSED, trailers);
600       unregister();
601     }
602 
603     @Override
604     @GuardedBy("this")
deliverCloseAbnormal(Status status)605     protected void deliverCloseAbnormal(Status status) {
606       listener.closed(status, RpcProgress.PROCESSED, new Metadata());
607     }
608   }
609 
610   // ======================================
611   // Server-side inbound transactions.
612   static final class ServerInbound extends Inbound<ServerStreamListener> {
613 
614     private final BinderTransport.BinderServerTransport serverTransport;
615 
ServerInbound( BinderTransport.BinderServerTransport transport, Attributes attributes, int callId)616     ServerInbound(
617         BinderTransport.BinderServerTransport transport, Attributes attributes, int callId) {
618       super(transport, attributes, callId);
619       this.serverTransport = transport;
620     }
621 
622     @GuardedBy("this")
623     @Override
handlePrefix(int flags, Parcel parcel)624     protected void handlePrefix(int flags, Parcel parcel) throws StatusException {
625       String methodName = parcel.readString();
626       Metadata headers = MetadataHelper.readMetadata(parcel, attributes);
627 
628       StatsTraceContext statsTraceContext =
629           serverTransport.createStatsTraceContext(methodName, headers);
630       Outbound.ServerOutbound outbound =
631           new Outbound.ServerOutbound(serverTransport, callId, statsTraceContext);
632       ServerStream stream;
633       if ((flags & TransactionUtils.FLAG_EXPECT_SINGLE_MESSAGE) != 0) {
634         stream = new SingleMessageServerStream(this, outbound, attributes);
635       } else {
636         stream = new MultiMessageServerStream(this, outbound, attributes);
637       }
638       Status status = serverTransport.startStream(stream, methodName, headers);
639       if (status.isOk()) {
640         checkNotNull(listener); // Is it ok to assume this will happen synchronously?
641         if (transport.isReady()) {
642           listener.onReady();
643         }
644       } else {
645         closeAbnormal(status);
646       }
647     }
648 
649     @GuardedBy("this")
650     @Override
handleSuffix(int flags, Parcel parcel)651     protected void handleSuffix(int flags, Parcel parcel) {
652       // Nothing to read.
653     }
654 
655     @Override
656     @GuardedBy("this")
deliverSuffix()657     protected void deliverSuffix() {
658       listener.halfClosed();
659     }
660 
661     @Override
662     @GuardedBy("this")
deliverCloseAbnormal(Status status)663     protected void deliverCloseAbnormal(Status status) {
664       listener.closed(status);
665     }
666 
667     @GuardedBy("this")
onCloseSent(Status status)668     void onCloseSent(Status status) {
669       if (!isClosed()) {
670         onDeliveryState(State.CLOSED);
671         statsTraceContext.streamClosed(status);
672         listener.closed(Status.OK);
673       }
674     }
675   }
676 
677   // ======================================
678   // Helper methods.
679 
checkTransition(State current, State next)680   private static void checkTransition(State current, State next) {
681     switch (next) {
682       case INITIALIZED:
683         checkState(current == State.UNINITIALIZED, "%s -> %s", current, next);
684         break;
685       case PREFIX_DELIVERED:
686         checkState(
687             current == State.INITIALIZED || current == State.UNINITIALIZED,
688             "%s -> %s",
689             current,
690             next);
691         break;
692       case ALL_MESSAGES_DELIVERED:
693         checkState(current == State.PREFIX_DELIVERED, "%s -> %s", current, next);
694         break;
695       case SUFFIX_DELIVERED:
696         checkState(current == State.ALL_MESSAGES_DELIVERED, "%s -> %s", current, next);
697         break;
698       case CLOSED:
699         break;
700       default:
701         throw new AssertionError();
702     }
703   }
704 
705   // ======================================
706   // Message reassembly.
707 
708   /** Part of an unconsumed message. */
709   private static final class TransactionData {
710     @Nullable final InputStream stream;
711     @Nullable final byte[] block;
712     final int numBytes;
713     final boolean lastBlockOfMessage;
714 
TransactionData(InputStream stream, byte[] block, int numBytes, boolean lastBlockOfMessage)715     TransactionData(InputStream stream, byte[] block, int numBytes, boolean lastBlockOfMessage) {
716       this.stream = stream;
717       this.block = block;
718       this.numBytes = numBytes;
719       this.lastBlockOfMessage = lastBlockOfMessage;
720     }
721 
722     @Override
toString()723     public String toString() {
724       return "TransactionData["
725           + numBytes
726           + "b "
727           + (stream != null ? "stream" : "array")
728           + (lastBlockOfMessage ? "(last)]" : "]");
729     }
730   }
731 }
732