• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net.impl;
6 
7 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_IDLE;
8 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_LOWEST;
9 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_LOW;
10 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_MEDIUM;
11 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_HIGHEST;
12 
13 import androidx.annotation.IntDef;
14 import androidx.annotation.VisibleForTesting;
15 
16 import org.jni_zero.CalledByNative;
17 import org.jni_zero.JNINamespace;
18 import org.jni_zero.NativeClassQualifiedName;
19 import org.jni_zero.NativeMethods;
20 
21 import org.chromium.base.Log;
22 import org.chromium.net.BidirectionalStream;
23 import org.chromium.net.CallbackException;
24 import org.chromium.net.CronetException;
25 import org.chromium.net.ExperimentalBidirectionalStream;
26 import org.chromium.net.NetworkException;
27 import org.chromium.net.RequestFinishedInfo;
28 import org.chromium.net.RequestPriority;
29 import org.chromium.net.UrlResponseInfo;
30 
31 import java.lang.annotation.Retention;
32 import java.lang.annotation.RetentionPolicy;
33 import java.nio.ByteBuffer;
34 import java.util.AbstractMap;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.Collection;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.RejectedExecutionException;
43 
44 import javax.annotation.concurrent.GuardedBy;
45 
46 /**
47  * {@link BidirectionalStream} implementation using Chromium network stack.
48  * All @CalledByNative methods are called on the native network thread
49  * and post tasks with callback calls onto Executor. Upon returning from callback, the native
50  * stream is called on Executor thread and posts native tasks to the native network thread.
51  */
52 @JNINamespace("cronet")
53 @VisibleForTesting
54 public class CronetBidirectionalStream extends ExperimentalBidirectionalStream {
55     /**
56      * States of BidirectionalStream are tracked in mReadState and mWriteState.
57      * The write state is separated out as it changes independently of the read state.
58      * There is one initial state: State.NOT_STARTED. There is one normal final state:
59      * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. There are two
60      * exceptional final states: State.CANCELED and State.ERROR, which can be reached from
61      * any other non-final state.
62      */
63     @IntDef({
64         State.NOT_STARTED,
65         State.STARTED,
66         State.WAITING_FOR_READ,
67         State.READING,
68         State.READING_DONE,
69         State.CANCELED,
70         State.ERROR,
71         State.SUCCESS,
72         State.WAITING_FOR_FLUSH,
73         State.WRITING,
74         State.WRITING_DONE
75     })
76     @Retention(RetentionPolicy.SOURCE)
77     private @interface State {
78         /* Initial state, stream not started. */
79         int NOT_STARTED = 0;
80         /*
81          * Stream started, request headers are being sent if mDelayRequestHeadersUntilNextFlush
82          * is not set to true.
83          */
84         int STARTED = 1;
85         /* Waiting for {@code read()} to be called. */
86         int WAITING_FOR_READ = 2;
87         /* Reading from the remote, {@code onReadCompleted()} callback will be called when done. */
88         int READING = 3;
89         /* There is no more data to read and stream is half-closed by the remote side. */
90         int READING_DONE = 4;
91         /* Stream is canceled. */
92         int CANCELED = 5;
93         /* Error has occurred, stream is closed. */
94         int ERROR = 6;
95         /* Reading and writing are done, and the stream is closed successfully. */
96         int SUCCESS = 7;
97         /* Waiting for {@code CronetBidirectionalStreamJni.get().sendRequestHeaders()} or {@code
98         CronetBidirectionalStreamJni.get().writevData()} to be called. */
99         int WAITING_FOR_FLUSH = 8;
100         /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */
101         int WRITING = 9;
102         /* There is no more data to write and stream is half-closed by the local side. */
103         int WRITING_DONE = 10;
104     }
105 
106     private final CronetUrlRequestContext mRequestContext;
107     private final Executor mExecutor;
108     private final VersionSafeCallbacks.BidirectionalStreamCallback mCallback;
109     private final String mInitialUrl;
110     private final int mInitialPriority;
111     private final String mInitialMethod;
112     private final String mRequestHeaders[];
113     private final UrlResponseInfo.HeaderBlock mRequestHeaderBlock;
114     private final boolean mDelayRequestHeadersUntilFirstFlush;
115     private final Collection<Object> mRequestAnnotations;
116     private final boolean mTrafficStatsTagSet;
117     private final int mTrafficStatsTag;
118     private final boolean mTrafficStatsUidSet;
119     private final int mTrafficStatsUid;
120     private final long mNetworkHandle;
121     private RefCountDelegate mInflightDoneCallbackCount;
122     private CronetException mException;
123 
124     /*
125      * Synchronizes access to mNativeStream, mReadState and mWriteState.
126      */
127     private final Object mNativeStreamLock = new Object();
128 
129     @GuardedBy("mNativeStreamLock")
130     // Pending write data.
131     private LinkedList<ByteBuffer> mPendingData;
132 
133     @GuardedBy("mNativeStreamLock")
134     // Flush data queue that should be pushed to the native stack when the previous
135     // CronetBidirectionalStreamJni.get().writevData completes.
136     private LinkedList<ByteBuffer> mFlushData;
137 
138     @GuardedBy("mNativeStreamLock")
139     // Whether an end-of-stream flag is passed in through write().
140     private boolean mEndOfStreamWritten;
141 
142     @GuardedBy("mNativeStreamLock")
143     // Whether request headers have been sent.
144     private boolean mRequestHeadersSent;
145 
146     @GuardedBy("mNativeStreamLock")
147     // Metrics information. Obtained when request succeeds, fails or is canceled.
148     private RequestFinishedInfo.Metrics mMetrics;
149 
150     /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
151     @GuardedBy("mNativeStreamLock")
152     private long mNativeStream;
153 
154     /**
155      * Read state is tracking reading flow.
156      *                         / <--- READING <--- \
157      *                         |                   |
158      *                         \                   /
159      * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
160      */
161     @GuardedBy("mNativeStreamLock")
162     private @State int mReadState = State.NOT_STARTED;
163 
164     /**
165      * Write state is tracking writing flow.
166      *                         / <---  WRITING  <--- \
167      *                         |                     |
168      *                         \                     /
169      * NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS
170      */
171     @GuardedBy("mNativeStreamLock")
172     private @State int mWriteState = State.NOT_STARTED;
173 
174     // Only modified on the network thread.
175     private UrlResponseInfoImpl mResponseInfo;
176 
177     /*
178      * OnReadCompleted callback is repeatedly invoked when each read is completed, so it
179      * is cached as a member variable.
180      */
181     // Only modified on the network thread.
182     private OnReadCompletedRunnable mOnReadCompletedTask;
183 
184     private Runnable mOnDestroyedCallbackForTesting;
185 
186     private final class OnReadCompletedRunnable implements Runnable {
187         // Buffer passed back from current invocation of onReadCompleted.
188         ByteBuffer mByteBuffer;
189         // End of stream flag from current invocation of onReadCompleted.
190         boolean mEndOfStream;
191 
192         @Override
run()193         public void run() {
194             try {
195                 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
196                 ByteBuffer buffer = mByteBuffer;
197                 mByteBuffer = null;
198                 boolean maybeOnSucceeded = false;
199                 synchronized (mNativeStreamLock) {
200                     if (isDoneLocked()) {
201                         return;
202                     }
203                     if (mEndOfStream) {
204                         mReadState = State.READING_DONE;
205                         maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
206                     } else {
207                         mReadState = State.WAITING_FOR_READ;
208                     }
209                 }
210                 mCallback.onReadCompleted(
211                         CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
212                 if (maybeOnSucceeded) {
213                     maybeOnSucceededOnExecutor();
214                 }
215             } catch (Exception e) {
216                 onCallbackException(e);
217             }
218         }
219     }
220 
221     private final class OnWriteCompletedRunnable implements Runnable {
222         // Buffer passed back from current invocation of onWriteCompleted.
223         private ByteBuffer mByteBuffer;
224         // End of stream flag from current call to write.
225         private final boolean mEndOfStream;
226 
OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream)227         OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
228             mByteBuffer = buffer;
229             mEndOfStream = endOfStream;
230         }
231 
232         @Override
run()233         public void run() {
234             try {
235                 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
236                 ByteBuffer buffer = mByteBuffer;
237                 mByteBuffer = null;
238                 boolean maybeOnSucceeded = false;
239                 synchronized (mNativeStreamLock) {
240                     if (isDoneLocked()) {
241                         return;
242                     }
243                     if (mEndOfStream) {
244                         mWriteState = State.WRITING_DONE;
245                         maybeOnSucceeded = (mReadState == State.READING_DONE);
246                     }
247                 }
248                 mCallback.onWriteCompleted(
249                         CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
250                 if (maybeOnSucceeded) {
251                     maybeOnSucceededOnExecutor();
252                 }
253             } catch (Exception e) {
254                 onCallbackException(e);
255             }
256         }
257     }
258 
CronetBidirectionalStream( CronetUrlRequestContext requestContext, String url, @CronetEngineBase.StreamPriority int priority, Callback callback, Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, boolean delayRequestHeadersUntilNextFlush, Collection<Object> requestAnnotations, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)259     CronetBidirectionalStream(
260             CronetUrlRequestContext requestContext,
261             String url,
262             @CronetEngineBase.StreamPriority int priority,
263             Callback callback,
264             Executor executor,
265             String httpMethod,
266             List<Map.Entry<String, String>> requestHeaders,
267             boolean delayRequestHeadersUntilNextFlush,
268             Collection<Object> requestAnnotations,
269             boolean trafficStatsTagSet,
270             int trafficStatsTag,
271             boolean trafficStatsUidSet,
272             int trafficStatsUid,
273             long networkHandle) {
274         mRequestContext = requestContext;
275         mInitialUrl = url;
276         mInitialPriority = convertStreamPriority(priority);
277         mCallback = new VersionSafeCallbacks.BidirectionalStreamCallback(callback);
278         mExecutor = executor;
279         mInitialMethod = httpMethod;
280         mRequestHeaders = stringsFromHeaderList(requestHeaders);
281         mRequestHeaderBlock = new UrlResponseInfoImpl.HeaderBlockImpl(requestHeaders);
282         mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush;
283         mPendingData = new LinkedList<>();
284         mFlushData = new LinkedList<>();
285         mRequestAnnotations = requestAnnotations;
286         mTrafficStatsTagSet = trafficStatsTagSet;
287         mTrafficStatsTag = trafficStatsTag;
288         mTrafficStatsUidSet = trafficStatsUidSet;
289         mTrafficStatsUid = trafficStatsUid;
290         mNetworkHandle = networkHandle;
291     }
292 
293     @Override
getHttpMethod()294     public String getHttpMethod() {
295         return mInitialMethod;
296     }
297 
298     @Override
hasTrafficStatsTag()299     public boolean hasTrafficStatsTag() {
300         return mTrafficStatsTagSet;
301     }
302 
303     @Override
getTrafficStatsTag()304     public int getTrafficStatsTag() {
305         if (!hasTrafficStatsTag()) {
306             throw new IllegalStateException("TrafficStatsTag is not set");
307         }
308         return mTrafficStatsTag;
309     }
310 
311     @Override
hasTrafficStatsUid()312     public boolean hasTrafficStatsUid() {
313         return mTrafficStatsUidSet;
314     }
315 
316     @Override
getTrafficStatsUid()317     public int getTrafficStatsUid() {
318         if (!hasTrafficStatsUid()) {
319             throw new IllegalStateException("TrafficStatsUid is not set");
320         }
321         return mTrafficStatsUid;
322     }
323 
324     @Override
getHeaders()325     public UrlResponseInfo.HeaderBlock getHeaders() {
326         return mRequestHeaderBlock;
327     }
328 
329     @Override
getPriority()330     public int getPriority() {
331         switch (mInitialPriority) {
332             case RequestPriority.IDLE:
333                 return STREAM_PRIORITY_IDLE;
334             case RequestPriority.LOWEST:
335                 return STREAM_PRIORITY_LOWEST;
336             case RequestPriority.LOW:
337                 return STREAM_PRIORITY_LOW;
338             case RequestPriority.MEDIUM:
339                 return STREAM_PRIORITY_MEDIUM;
340             case RequestPriority.HIGHEST:
341                 return STREAM_PRIORITY_HIGHEST;
342             default:
343                 throw new IllegalStateException("Invalid stream priority: " + mInitialPriority);
344         }
345     }
346 
347     @Override
isDelayRequestHeadersUntilFirstFlushEnabled()348     public boolean isDelayRequestHeadersUntilFirstFlushEnabled() {
349         return mDelayRequestHeadersUntilFirstFlush;
350     }
351 
352     @Override
start()353     public void start() {
354         synchronized (mNativeStreamLock) {
355             if (mReadState != State.NOT_STARTED) {
356                 throw new IllegalStateException("Stream is already started.");
357             }
358             try {
359                 mNativeStream =
360                         CronetBidirectionalStreamJni.get()
361                                 .createBidirectionalStream(
362                                         CronetBidirectionalStream.this,
363                                         mRequestContext.getUrlRequestContextAdapter(),
364                                         !mDelayRequestHeadersUntilFirstFlush,
365                                         mTrafficStatsTagSet,
366                                         mTrafficStatsTag,
367                                         mTrafficStatsUidSet,
368                                         mTrafficStatsUid,
369                                         mNetworkHandle);
370                 mRequestContext.onRequestStarted();
371                 mInflightDoneCallbackCount =
372                         new RefCountDelegate(mRequestContext::onRequestFinished);
373                 // We need an initial count of 2: one decrement for the final callback
374                 // (e.g. onSucceeded), and another for onMetricsCollected().
375                 mInflightDoneCallbackCount.increment();
376                 // Non-zero startResult means an argument error.
377                 int startResult =
378                         CronetBidirectionalStreamJni.get()
379                                 .start(
380                                         mNativeStream,
381                                         CronetBidirectionalStream.this,
382                                         mInitialUrl,
383                                         mInitialPriority,
384                                         mInitialMethod,
385                                         mRequestHeaders,
386                                         !doesMethodAllowWriteData(mInitialMethod));
387                 if (startResult == -1) {
388                     throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
389                 }
390                 if (startResult > 0) {
391                     int headerPos = startResult - 1;
392                     throw new IllegalArgumentException(
393                             "Invalid header with headername: " + mRequestHeaders[headerPos]);
394                 }
395                 mReadState = mWriteState = State.STARTED;
396             } catch (RuntimeException e) {
397                 // If there's an exception, clean up and then throw the
398                 // exception to the caller.
399                 destroyNativeStreamLocked(false);
400                 mInflightDoneCallbackCount.decrement();
401                 mInflightDoneCallbackCount.decrement();
402                 throw e;
403             }
404         }
405     }
406 
407     @Override
read(ByteBuffer buffer)408     public void read(ByteBuffer buffer) {
409         synchronized (mNativeStreamLock) {
410             Preconditions.checkHasRemaining(buffer);
411             Preconditions.checkDirect(buffer);
412             if (mReadState != State.WAITING_FOR_READ) {
413                 throw new IllegalStateException("Unexpected read attempt.");
414             }
415             if (isDoneLocked()) {
416                 return;
417             }
418             if (mOnReadCompletedTask == null) {
419                 mOnReadCompletedTask = new OnReadCompletedRunnable();
420             }
421             mReadState = State.READING;
422             if (!CronetBidirectionalStreamJni.get()
423                     .readData(
424                             mNativeStream,
425                             CronetBidirectionalStream.this,
426                             buffer,
427                             buffer.position(),
428                             buffer.limit())) {
429                 // Still waiting on read. This is just to have consistent
430                 // behavior with the other error cases.
431                 mReadState = State.WAITING_FOR_READ;
432                 throw new IllegalArgumentException("Unable to call native read");
433             }
434         }
435     }
436 
437     @Override
write(ByteBuffer buffer, boolean endOfStream)438     public void write(ByteBuffer buffer, boolean endOfStream) {
439         synchronized (mNativeStreamLock) {
440             Preconditions.checkDirect(buffer);
441             if (!buffer.hasRemaining() && !endOfStream) {
442                 throw new IllegalArgumentException("Empty buffer before end of stream.");
443             }
444             if (mEndOfStreamWritten) {
445                 throw new IllegalArgumentException("Write after writing end of stream.");
446             }
447             if (isDoneLocked()) {
448                 return;
449             }
450             mPendingData.add(buffer);
451             if (endOfStream) {
452                 mEndOfStreamWritten = true;
453             }
454         }
455     }
456 
457     @Override
flush()458     public void flush() {
459         synchronized (mNativeStreamLock) {
460             if (isDoneLocked()
461                     || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != State.WRITING)) {
462                 return;
463             }
464             if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
465                 // If there is no pending write when flush() is called, see if
466                 // request headers need to be flushed.
467                 if (!mRequestHeadersSent) {
468                     mRequestHeadersSent = true;
469                     CronetBidirectionalStreamJni.get()
470                             .sendRequestHeaders(mNativeStream, CronetBidirectionalStream.this);
471                     if (!doesMethodAllowWriteData(mInitialMethod)) {
472                         mWriteState = State.WRITING_DONE;
473                     }
474                 }
475                 return;
476             }
477 
478             assert !mPendingData.isEmpty() || !mFlushData.isEmpty();
479 
480             // Move buffers from mPendingData to the flushing queue.
481             if (!mPendingData.isEmpty()) {
482                 mFlushData.addAll(mPendingData);
483                 mPendingData.clear();
484             }
485 
486             if (mWriteState == State.WRITING) {
487                 // If there is a write already pending, wait until onWritevCompleted is
488                 // called before pushing data to the native stack.
489                 return;
490             }
491             sendFlushDataLocked();
492         }
493     }
494 
495     // Helper method to send buffers in mFlushData. Caller needs to acquire
496     // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and
497     // mFlushData queue isn't empty.
498     @SuppressWarnings("GuardedByChecker")
sendFlushDataLocked()499     private void sendFlushDataLocked() {
500         assert mWriteState == State.WAITING_FOR_FLUSH;
501         int size = mFlushData.size();
502         ByteBuffer[] buffers = new ByteBuffer[size];
503         int[] positions = new int[size];
504         int[] limits = new int[size];
505         for (int i = 0; i < size; i++) {
506             ByteBuffer buffer = mFlushData.poll();
507             buffers[i] = buffer;
508             positions[i] = buffer.position();
509             limits[i] = buffer.limit();
510         }
511         assert mFlushData.isEmpty();
512         assert buffers.length >= 1;
513         mWriteState = State.WRITING;
514         mRequestHeadersSent = true;
515         if (!CronetBidirectionalStreamJni.get()
516                 .writevData(
517                         mNativeStream,
518                         CronetBidirectionalStream.this,
519                         buffers,
520                         positions,
521                         limits,
522                         mEndOfStreamWritten && mPendingData.isEmpty())) {
523             // Still waiting on flush. This is just to have consistent
524             // behavior with the other error cases.
525             mWriteState = State.WAITING_FOR_FLUSH;
526             throw new IllegalArgumentException("Unable to call native writev.");
527         }
528     }
529 
530     /** Returns a read-only copy of {@code mPendingData} for testing. */
getPendingDataForTesting()531     public List<ByteBuffer> getPendingDataForTesting() {
532         synchronized (mNativeStreamLock) {
533             List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>();
534             for (ByteBuffer buffer : mPendingData) {
535                 pendingData.add(buffer.asReadOnlyBuffer());
536             }
537             return pendingData;
538         }
539     }
540 
541     /** Returns a read-only copy of {@code mFlushData} for testing. */
getFlushDataForTesting()542     public List<ByteBuffer> getFlushDataForTesting() {
543         synchronized (mNativeStreamLock) {
544             List<ByteBuffer> flushData = new LinkedList<ByteBuffer>();
545             for (ByteBuffer buffer : mFlushData) {
546                 flushData.add(buffer.asReadOnlyBuffer());
547             }
548             return flushData;
549         }
550     }
551 
552     @Override
cancel()553     public void cancel() {
554         synchronized (mNativeStreamLock) {
555             if (isDoneLocked() || mReadState == State.NOT_STARTED) {
556                 return;
557             }
558             mReadState = mWriteState = State.CANCELED;
559             destroyNativeStreamLocked(true);
560         }
561     }
562 
563     @Override
isDone()564     public boolean isDone() {
565         synchronized (mNativeStreamLock) {
566             return isDoneLocked();
567         }
568     }
569 
570     @GuardedBy("mNativeStreamLock")
isDoneLocked()571     private boolean isDoneLocked() {
572         return mReadState != State.NOT_STARTED && mNativeStream == 0;
573     }
574 
575     /*
576      * Runs an onSucceeded callback if both Read and Write sides are closed.
577      */
maybeOnSucceededOnExecutor()578     private void maybeOnSucceededOnExecutor() {
579         synchronized (mNativeStreamLock) {
580             if (isDoneLocked()) {
581                 return;
582             }
583             if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) {
584                 return;
585             }
586             mReadState = mWriteState = State.SUCCESS;
587             // Destroy native stream first, so UrlRequestContext could be shut
588             // down from the listener.
589             destroyNativeStreamLocked(false);
590         }
591         try {
592             mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
593         } catch (Exception e) {
594             Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
595         }
596         mInflightDoneCallbackCount.decrement();
597     }
598 
599     @SuppressWarnings("unused")
600     @CalledByNative
onStreamReady(final boolean requestHeadersSent)601     private void onStreamReady(final boolean requestHeadersSent) {
602         postTaskToExecutor(
603                 new Runnable() {
604                     @Override
605                     public void run() {
606                         synchronized (mNativeStreamLock) {
607                             if (isDoneLocked()) {
608                                 return;
609                             }
610                             mRequestHeadersSent = requestHeadersSent;
611                             mReadState = State.WAITING_FOR_READ;
612                             if (!doesMethodAllowWriteData(mInitialMethod) && mRequestHeadersSent) {
613                                 mWriteState = State.WRITING_DONE;
614                             } else {
615                                 mWriteState = State.WAITING_FOR_FLUSH;
616                             }
617                         }
618 
619                         try {
620                             mCallback.onStreamReady(CronetBidirectionalStream.this);
621                         } catch (Exception e) {
622                             onCallbackException(e);
623                         }
624                     }
625                 });
626     }
627 
628     /**
629      * Called when the final set of headers, after all redirects,
630      * is received. Can only be called once for each stream.
631      */
632     @SuppressWarnings("unused")
633     @CalledByNative
onResponseHeadersReceived( int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)634     private void onResponseHeadersReceived(
635             int httpStatusCode,
636             String negotiatedProtocol,
637             String[] headers,
638             long receivedByteCount) {
639         try {
640             mResponseInfo =
641                     prepareResponseInfoOnNetworkThread(
642                             httpStatusCode, negotiatedProtocol, headers, receivedByteCount);
643         } catch (Exception e) {
644             failWithException(new CronetExceptionImpl("Cannot prepare ResponseInfo", null));
645             return;
646         }
647         postTaskToExecutor(
648                 new Runnable() {
649                     @Override
650                     public void run() {
651                         synchronized (mNativeStreamLock) {
652                             if (isDoneLocked()) {
653                                 return;
654                             }
655                             mReadState = State.WAITING_FOR_READ;
656                         }
657 
658                         try {
659                             mCallback.onResponseHeadersReceived(
660                                     CronetBidirectionalStream.this, mResponseInfo);
661                         } catch (Exception e) {
662                             onCallbackException(e);
663                         }
664                     }
665                 });
666     }
667 
668     @SuppressWarnings("unused")
669     @CalledByNative
onReadCompleted( final ByteBuffer byteBuffer, int bytesRead, int initialPosition, int initialLimit, long receivedByteCount)670     private void onReadCompleted(
671             final ByteBuffer byteBuffer,
672             int bytesRead,
673             int initialPosition,
674             int initialLimit,
675             long receivedByteCount) {
676         mResponseInfo.setReceivedByteCount(receivedByteCount);
677         if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) {
678             failWithException(
679                     new CronetExceptionImpl("ByteBuffer modified externally during read", null));
680             return;
681         }
682         if (bytesRead < 0 || initialPosition + bytesRead > initialLimit) {
683             failWithException(new CronetExceptionImpl("Invalid number of bytes read", null));
684             return;
685         }
686         byteBuffer.position(initialPosition + bytesRead);
687         assert mOnReadCompletedTask.mByteBuffer == null;
688         mOnReadCompletedTask.mByteBuffer = byteBuffer;
689         mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
690         postTaskToExecutor(mOnReadCompletedTask);
691     }
692 
693     @SuppressWarnings("unused")
694     @CalledByNative
onWritevCompleted( final ByteBuffer[] byteBuffers, int[] initialPositions, int[] initialLimits, boolean endOfStream)695     private void onWritevCompleted(
696             final ByteBuffer[] byteBuffers,
697             int[] initialPositions,
698             int[] initialLimits,
699             boolean endOfStream) {
700         assert byteBuffers.length == initialPositions.length;
701         assert byteBuffers.length == initialLimits.length;
702         synchronized (mNativeStreamLock) {
703             if (isDoneLocked()) return;
704             mWriteState = State.WAITING_FOR_FLUSH;
705             // Flush if there is anything in the flush queue mFlushData.
706             if (!mFlushData.isEmpty()) {
707                 sendFlushDataLocked();
708             }
709         }
710         for (int i = 0; i < byteBuffers.length; i++) {
711             ByteBuffer buffer = byteBuffers[i];
712             if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) {
713                 failWithException(
714                         new CronetExceptionImpl(
715                                 "ByteBuffer modified externally during write", null));
716                 return;
717             }
718             // Current implementation always writes the complete buffer.
719             buffer.position(buffer.limit());
720             postTaskToExecutor(
721                     new OnWriteCompletedRunnable(
722                             buffer,
723                             // Only set endOfStream flag if this buffer is the last in byteBuffers.
724                             endOfStream && i == byteBuffers.length - 1));
725         }
726     }
727 
728     @SuppressWarnings("unused")
729     @CalledByNative
onResponseTrailersReceived(String[] trailers)730     private void onResponseTrailersReceived(String[] trailers) {
731         final UrlResponseInfo.HeaderBlock trailersBlock =
732                 new UrlResponseInfoImpl.HeaderBlockImpl(headersListFromStrings(trailers));
733         postTaskToExecutor(
734                 new Runnable() {
735                     @Override
736                     public void run() {
737                         synchronized (mNativeStreamLock) {
738                             if (isDoneLocked()) {
739                                 return;
740                             }
741                         }
742                         try {
743                             mCallback.onResponseTrailersReceived(
744                                     CronetBidirectionalStream.this, mResponseInfo, trailersBlock);
745                         } catch (Exception e) {
746                             onCallbackException(e);
747                         }
748                     }
749                 });
750     }
751 
752     @SuppressWarnings("unused")
753     @CalledByNative
onError( int errorCode, int nativeError, int nativeQuicError, String errorString, long receivedByteCount)754     private void onError(
755             int errorCode,
756             int nativeError,
757             int nativeQuicError,
758             String errorString,
759             long receivedByteCount) {
760         if (mResponseInfo != null) {
761             mResponseInfo.setReceivedByteCount(receivedByteCount);
762         }
763         if (errorCode == NetworkException.ERROR_QUIC_PROTOCOL_FAILED
764                 || errorCode == NetworkException.ERROR_NETWORK_CHANGED) {
765             failWithException(
766                     new QuicExceptionImpl(
767                             "Exception in BidirectionalStream: " + errorString,
768                             errorCode,
769                             nativeError,
770                             nativeQuicError));
771         } else {
772             failWithException(
773                     new BidirectionalStreamNetworkException(
774                             "Exception in BidirectionalStream: " + errorString,
775                             errorCode,
776                             nativeError));
777         }
778     }
779 
780     /** Called when request is canceled, no callbacks will be called afterwards. */
781     @SuppressWarnings("unused")
782     @CalledByNative
onCanceled()783     private void onCanceled() {
784         postTaskToExecutor(
785                 new Runnable() {
786                     @Override
787                     public void run() {
788                         try {
789                             mCallback.onCanceled(CronetBidirectionalStream.this, mResponseInfo);
790                         } catch (Exception e) {
791                             Log.e(
792                                     CronetUrlRequestContext.LOG_TAG,
793                                     "Exception in onCanceled method",
794                                     e);
795                         }
796                         mInflightDoneCallbackCount.decrement();
797                     }
798                 });
799     }
800 
801     /** Called by the native code to report metrics just before the native adapter is destroyed. */
802     @SuppressWarnings("unused")
803     @CalledByNative
onMetricsCollected( long requestStartMs, long dnsStartMs, long dnsEndMs, long connectStartMs, long connectEndMs, long sslStartMs, long sslEndMs, long sendingStartMs, long sendingEndMs, long pushStartMs, long pushEndMs, long responseStartMs, long requestEndMs, boolean socketReused, long sentByteCount, long receivedByteCount)804     private void onMetricsCollected(
805             long requestStartMs,
806             long dnsStartMs,
807             long dnsEndMs,
808             long connectStartMs,
809             long connectEndMs,
810             long sslStartMs,
811             long sslEndMs,
812             long sendingStartMs,
813             long sendingEndMs,
814             long pushStartMs,
815             long pushEndMs,
816             long responseStartMs,
817             long requestEndMs,
818             boolean socketReused,
819             long sentByteCount,
820             long receivedByteCount) {
821         synchronized (mNativeStreamLock) {
822             try {
823                 if (mMetrics != null) {
824                     throw new IllegalStateException("Metrics collection should only happen once.");
825                 }
826                 mMetrics =
827                         new CronetMetrics(
828                                 requestStartMs,
829                                 dnsStartMs,
830                                 dnsEndMs,
831                                 connectStartMs,
832                                 connectEndMs,
833                                 sslStartMs,
834                                 sslEndMs,
835                                 sendingStartMs,
836                                 sendingEndMs,
837                                 pushStartMs,
838                                 pushEndMs,
839                                 responseStartMs,
840                                 requestEndMs,
841                                 socketReused,
842                                 sentByteCount,
843                                 receivedByteCount);
844                 assert mReadState == mWriteState;
845                 assert (mReadState == State.SUCCESS)
846                         || (mReadState == State.ERROR)
847                         || (mReadState == State.CANCELED);
848                 int finishedReason;
849                 if (mReadState == State.SUCCESS) {
850                     finishedReason = RequestFinishedInfo.SUCCEEDED;
851                 } else if (mReadState == State.CANCELED) {
852                     finishedReason = RequestFinishedInfo.CANCELED;
853                 } else {
854                     finishedReason = RequestFinishedInfo.FAILED;
855                 }
856                 final RequestFinishedInfo requestFinishedInfo =
857                         new RequestFinishedInfoImpl(
858                                 mInitialUrl,
859                                 mRequestAnnotations,
860                                 mMetrics,
861                                 finishedReason,
862                                 mResponseInfo,
863                                 mException);
864                 mRequestContext.reportRequestFinished(
865                         requestFinishedInfo, mInflightDoneCallbackCount);
866             } finally {
867                 mInflightDoneCallbackCount.decrement();
868             }
869         }
870     }
871 
setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting)872     public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) {
873         mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting;
874     }
875 
doesMethodAllowWriteData(String methodName)876     private static boolean doesMethodAllowWriteData(String methodName) {
877         return !methodName.equals("GET") && !methodName.equals("HEAD");
878     }
879 
headersListFromStrings(String[] headers)880     private static ArrayList<Map.Entry<String, String>> headersListFromStrings(String[] headers) {
881         ArrayList<Map.Entry<String, String>> headersList = new ArrayList<>(headers.length / 2);
882         for (int i = 0; i < headers.length; i += 2) {
883             headersList.add(new AbstractMap.SimpleImmutableEntry<>(headers[i], headers[i + 1]));
884         }
885         return headersList;
886     }
887 
stringsFromHeaderList(List<Map.Entry<String, String>> headersList)888     private static String[] stringsFromHeaderList(List<Map.Entry<String, String>> headersList) {
889         String headersArray[] = new String[headersList.size() * 2];
890         int i = 0;
891         for (Map.Entry<String, String> requestHeader : headersList) {
892             headersArray[i++] = requestHeader.getKey();
893             headersArray[i++] = requestHeader.getValue();
894         }
895         return headersArray;
896     }
897 
convertStreamPriority(@ronetEngineBase.StreamPriority int priority)898     private static int convertStreamPriority(@CronetEngineBase.StreamPriority int priority) {
899         switch (priority) {
900             case Builder.STREAM_PRIORITY_IDLE:
901                 return RequestPriority.IDLE;
902             case Builder.STREAM_PRIORITY_LOWEST:
903                 return RequestPriority.LOWEST;
904             case Builder.STREAM_PRIORITY_LOW:
905                 return RequestPriority.LOW;
906             case Builder.STREAM_PRIORITY_MEDIUM:
907                 return RequestPriority.MEDIUM;
908             case Builder.STREAM_PRIORITY_HIGHEST:
909                 return RequestPriority.HIGHEST;
910             default:
911                 throw new IllegalArgumentException("Invalid stream priority.");
912         }
913     }
914 
915     /**
916      * Posts task to application Executor. Used for callbacks
917      * and other tasks that should not be executed on network thread.
918      */
postTaskToExecutor(Runnable task)919     private void postTaskToExecutor(Runnable task) {
920         try {
921             mExecutor.execute(task);
922         } catch (RejectedExecutionException failException) {
923             Log.e(
924                     CronetUrlRequestContext.LOG_TAG,
925                     "Exception posting task to executor",
926                     failException);
927             // If posting a task throws an exception, then there is no choice
928             // but to destroy the stream without invoking the callback.
929             synchronized (mNativeStreamLock) {
930                 mReadState = mWriteState = State.ERROR;
931                 destroyNativeStreamLocked(false);
932             }
933         }
934     }
935 
prepareResponseInfoOnNetworkThread( int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)936     private UrlResponseInfoImpl prepareResponseInfoOnNetworkThread(
937             int httpStatusCode,
938             String negotiatedProtocol,
939             String[] headers,
940             long receivedByteCount) {
941         UrlResponseInfoImpl responseInfo =
942                 new UrlResponseInfoImpl(
943                         Arrays.asList(mInitialUrl),
944                         httpStatusCode,
945                         "",
946                         headersListFromStrings(headers),
947                         false,
948                         negotiatedProtocol,
949                         null,
950                         receivedByteCount);
951         return responseInfo;
952     }
953 
954     @GuardedBy("mNativeStreamLock")
destroyNativeStreamLocked(boolean sendOnCanceled)955     private void destroyNativeStreamLocked(boolean sendOnCanceled) {
956         Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + this.toString());
957         if (mNativeStream == 0) {
958             return;
959         }
960         CronetBidirectionalStreamJni.get()
961                 .destroy(mNativeStream, CronetBidirectionalStream.this, sendOnCanceled);
962         mRequestContext.onRequestDestroyed();
963         mNativeStream = 0;
964         if (mOnDestroyedCallbackForTesting != null) {
965             mOnDestroyedCallbackForTesting.run();
966         }
967     }
968 
969     /** Fails the stream with an exception. Only called on the Executor. */
failWithExceptionOnExecutor(CronetException e)970     private void failWithExceptionOnExecutor(CronetException e) {
971         mException = e;
972         // Do not call into mCallback if request is complete.
973         synchronized (mNativeStreamLock) {
974             if (isDoneLocked()) {
975                 return;
976             }
977             mReadState = mWriteState = State.ERROR;
978             destroyNativeStreamLocked(false);
979         }
980         try {
981             mCallback.onFailed(this, mResponseInfo, e);
982         } catch (Exception failException) {
983             Log.e(
984                     CronetUrlRequestContext.LOG_TAG,
985                     "Exception notifying of failed request",
986                     failException);
987         }
988         mInflightDoneCallbackCount.decrement();
989     }
990 
991     /**
992      * If callback method throws an exception, stream gets canceled
993      * and exception is reported via onFailed callback.
994      * Only called on the Executor.
995      */
onCallbackException(Exception e)996     private void onCallbackException(Exception e) {
997         CallbackException streamError =
998                 new CallbackExceptionImpl("CalledByNative method has thrown an exception", e);
999         Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative method", e);
1000         failWithExceptionOnExecutor(streamError);
1001     }
1002 
1003     /** Fails the stream with an exception. Can be called on any thread. */
failWithException(final CronetException exception)1004     private void failWithException(final CronetException exception) {
1005         postTaskToExecutor(
1006                 new Runnable() {
1007                     @Override
1008                     public void run() {
1009                         failWithExceptionOnExecutor(exception);
1010                     }
1011                 });
1012     }
1013 
1014     @NativeMethods
1015     interface Natives {
1016         // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
createBidirectionalStream( CronetBidirectionalStream caller, long urlRequestContextAdapter, boolean sendRequestHeadersAutomatically, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)1017         long createBidirectionalStream(
1018                 CronetBidirectionalStream caller,
1019                 long urlRequestContextAdapter,
1020                 boolean sendRequestHeadersAutomatically,
1021                 boolean trafficStatsTagSet,
1022                 int trafficStatsTag,
1023                 boolean trafficStatsUidSet,
1024                 int trafficStatsUid,
1025                 long networkHandle);
1026 
1027         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
start( long nativePtr, CronetBidirectionalStream caller, String url, int priority, String method, String[] headers, boolean endOfStream)1028         int start(
1029                 long nativePtr,
1030                 CronetBidirectionalStream caller,
1031                 String url,
1032                 int priority,
1033                 String method,
1034                 String[] headers,
1035                 boolean endOfStream);
1036 
1037         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller)1038         void sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller);
1039 
1040         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
readData( long nativePtr, CronetBidirectionalStream caller, ByteBuffer byteBuffer, int position, int limit)1041         boolean readData(
1042                 long nativePtr,
1043                 CronetBidirectionalStream caller,
1044                 ByteBuffer byteBuffer,
1045                 int position,
1046                 int limit);
1047 
1048         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
writevData( long nativePtr, CronetBidirectionalStream caller, ByteBuffer[] buffers, int[] positions, int[] limits, boolean endOfStream)1049         boolean writevData(
1050                 long nativePtr,
1051                 CronetBidirectionalStream caller,
1052                 ByteBuffer[] buffers,
1053                 int[] positions,
1054                 int[] limits,
1055                 boolean endOfStream);
1056 
1057         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled)1058         void destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled);
1059     }
1060 }
1061