• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net.impl;
6 
7 import androidx.annotation.IntDef;
8 import androidx.annotation.VisibleForTesting;
9 
10 import org.chromium.base.Log;
11 import org.chromium.base.annotations.CalledByNative;
12 import org.chromium.base.annotations.JNINamespace;
13 import org.chromium.base.annotations.NativeClassQualifiedName;
14 import org.chromium.base.annotations.NativeMethods;
15 import android.net.http.BidirectionalStream;
16 import android.net.http.CallbackException;
17 import android.net.http.HeaderBlock;
18 import android.net.http.HttpException;
19 import android.net.http.ExperimentalBidirectionalStream;
20 import android.net.http.NetworkException;
21 import android.net.http.RequestFinishedInfo;
22 import org.chromium.net.RequestPriority;
23 import android.net.http.UrlResponseInfo;
24 
25 import java.lang.annotation.Retention;
26 import java.lang.annotation.RetentionPolicy;
27 import java.nio.ByteBuffer;
28 import java.util.AbstractMap;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.RejectedExecutionException;
37 
38 import javax.annotation.concurrent.GuardedBy;
39 
40 /**
41  * {@link BidirectionalStream} implementation using Chromium network stack.
42  * All @CalledByNative methods are called on the native network thread
43  * and post tasks with callback calls onto Executor. Upon returning from callback, the native
44  * stream is called on Executor thread and posts native tasks to the native network thread.
45  */
46 @JNINamespace("cronet")
47 @VisibleForTesting
48 public class CronetBidirectionalStream extends ExperimentalBidirectionalStream {
49     /**
50      * States of BidirectionalStream are tracked in mReadState and mWriteState.
51      * The write state is separated out as it changes independently of the read state.
52      * There is one initial state: State.NOT_STARTED. There is one normal final state:
53      * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. There are two
54      * exceptional final states: State.CANCELED and State.ERROR, which can be reached from
55      * any other non-final state.
56      */
57     @IntDef({State.NOT_STARTED, State.STARTED, State.WAITING_FOR_READ, State.READING,
58             State.READING_DONE, State.CANCELED, State.ERROR, State.SUCCESS, State.WAITING_FOR_FLUSH,
59             State.WRITING, State.WRITING_DONE})
60     @Retention(RetentionPolicy.SOURCE)
61     private @interface State {
62         /* Initial state, stream not started. */
63         int NOT_STARTED = 0;
64         /*
65          * Stream started, request headers are being sent if mDelayRequestHeadersUntilNextFlush
66          * is not set to true.
67          */
68         int STARTED = 1;
69         /* Waiting for {@code read()} to be called. */
70         int WAITING_FOR_READ = 2;
71         /* Reading from the remote, {@code onReadCompleted()} callback will be called when done. */
72         int READING = 3;
73         /* There is no more data to read and stream is half-closed by the remote side. */
74         int READING_DONE = 4;
75         /* Stream is canceled. */
76         int CANCELED = 5;
77         /* Error has occurred, stream is closed. */
78         int ERROR = 6;
79         /* Reading and writing are done, and the stream is closed successfully. */
80         int SUCCESS = 7;
81         /* Waiting for {@code CronetBidirectionalStreamJni.get().sendRequestHeaders()} or {@code
82            CronetBidirectionalStreamJni.get().writevData()} to be called. */
83         int WAITING_FOR_FLUSH = 8;
84         /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */
85         int WRITING = 9;
86         /* There is no more data to write and stream is half-closed by the local side. */
87         int WRITING_DONE = 10;
88     }
89 
90     private final CronetUrlRequestContext mRequestContext;
91     private final Executor mExecutor;
92     private final VersionSafeCallbacks.BidirectionalStreamCallback mCallback;
93     private final String mInitialUrl;
94     private final int mInitialPriority;
95     private final String mInitialMethod;
96     private final String mRequestHeaders[];
97     private final HeaderBlock mRequestHeaderBlock;
98     private final boolean mDelayRequestHeadersUntilFirstFlush;
99     private final Collection<Object> mRequestAnnotations;
100     private final boolean mTrafficStatsTagSet;
101     private final int mTrafficStatsTag;
102     private final boolean mTrafficStatsUidSet;
103     private final int mTrafficStatsUid;
104     private final long mNetworkHandle;
105     private HttpException mException;
106 
107     /*
108      * Synchronizes access to mNativeStream, mReadState and mWriteState.
109      */
110     private final Object mNativeStreamLock = new Object();
111 
112     @GuardedBy("mNativeStreamLock")
113     // Pending write data.
114     private LinkedList<ByteBuffer> mPendingData;
115 
116     @GuardedBy("mNativeStreamLock")
117     // Flush data queue that should be pushed to the native stack when the previous
118     // CronetBidirectionalStreamJni.get().writevData completes.
119     private LinkedList<ByteBuffer> mFlushData;
120 
121     @GuardedBy("mNativeStreamLock")
122     // Whether an end-of-stream flag is passed in through write().
123     private boolean mEndOfStreamWritten;
124 
125     @GuardedBy("mNativeStreamLock")
126     // Whether request headers have been sent.
127     private boolean mRequestHeadersSent;
128 
129     @GuardedBy("mNativeStreamLock")
130     // Metrics information. Obtained when request succeeds, fails or is canceled.
131     private RequestFinishedInfo.Metrics mMetrics;
132 
133     /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
134     @GuardedBy("mNativeStreamLock")
135     private long mNativeStream;
136 
137     /**
138      * Read state is tracking reading flow.
139      *                         / <--- READING <--- \
140      *                         |                   |
141      *                         \                   /
142      * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
143      */
144     @GuardedBy("mNativeStreamLock")
145     private @State int mReadState = State.NOT_STARTED;
146 
147     /**
148      * Write state is tracking writing flow.
149      *                         / <---  WRITING  <--- \
150      *                         |                     |
151      *                         \                     /
152      * NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS
153      */
154     @GuardedBy("mNativeStreamLock")
155     private @State int mWriteState = State.NOT_STARTED;
156 
157     // Only modified on the network thread.
158     private UrlResponseInfoImpl mResponseInfo;
159 
160     /*
161      * OnReadCompleted callback is repeatedly invoked when each read is completed, so it
162      * is cached as a member variable.
163      */
164     // Only modified on the network thread.
165     private OnReadCompletedRunnable mOnReadCompletedTask;
166 
167     private Runnable mOnDestroyedCallbackForTesting;
168 
169     private final class OnReadCompletedRunnable implements Runnable {
170         // Buffer passed back from current invocation of onReadCompleted.
171         ByteBuffer mByteBuffer;
172         // End of stream flag from current invocation of onReadCompleted.
173         boolean mEndOfStream;
174 
175         @Override
run()176         public void run() {
177             try {
178                 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
179                 ByteBuffer buffer = mByteBuffer;
180                 mByteBuffer = null;
181                 boolean maybeOnSucceeded = false;
182                 synchronized (mNativeStreamLock) {
183                     if (isDoneLocked()) {
184                         return;
185                     }
186                     if (mEndOfStream) {
187                         mReadState = State.READING_DONE;
188                         maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
189                     } else {
190                         mReadState = State.WAITING_FOR_READ;
191                     }
192                 }
193                 mCallback.onReadCompleted(
194                         CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
195                 if (maybeOnSucceeded) {
196                     maybeOnSucceededOnExecutor();
197                 }
198             } catch (Exception e) {
199                 onCallbackException(e);
200             }
201         }
202     }
203 
204     private final class OnWriteCompletedRunnable implements Runnable {
205         // Buffer passed back from current invocation of onWriteCompleted.
206         private ByteBuffer mByteBuffer;
207         // End of stream flag from current call to write.
208         private final boolean mEndOfStream;
209 
OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream)210         OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
211             mByteBuffer = buffer;
212             mEndOfStream = endOfStream;
213         }
214 
215         @Override
run()216         public void run() {
217             try {
218                 // Null out mByteBuffer, to pass buffer ownership to callback or release if done.
219                 ByteBuffer buffer = mByteBuffer;
220                 mByteBuffer = null;
221                 boolean maybeOnSucceeded = false;
222                 synchronized (mNativeStreamLock) {
223                     if (isDoneLocked()) {
224                         return;
225                     }
226                     if (mEndOfStream) {
227                         mWriteState = State.WRITING_DONE;
228                         maybeOnSucceeded = (mReadState == State.READING_DONE);
229                     }
230                 }
231                 mCallback.onWriteCompleted(
232                         CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
233                 if (maybeOnSucceeded) {
234                     maybeOnSucceededOnExecutor();
235                 }
236             } catch (Exception e) {
237                 onCallbackException(e);
238             }
239         }
240     }
241 
CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, @CronetEngineBase.StreamPriority int priority, Callback callback, Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, boolean delayRequestHeadersUntilNextFlush, Collection<Object> requestAnnotations, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)242     CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url,
243             @CronetEngineBase.StreamPriority int priority, Callback callback, Executor executor,
244             String httpMethod, List<Map.Entry<String, String>> requestHeaders,
245             boolean delayRequestHeadersUntilNextFlush, Collection<Object> requestAnnotations,
246             boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet,
247             int trafficStatsUid, long networkHandle) {
248         mRequestContext = requestContext;
249         mInitialUrl = url;
250         mInitialPriority = convertStreamPriority(priority);
251         mCallback = new VersionSafeCallbacks.BidirectionalStreamCallback(callback);
252         mExecutor = executor;
253         mInitialMethod = httpMethod;
254         mRequestHeaders = stringsFromHeaderList(requestHeaders);
255         mRequestHeaderBlock = new HeaderBlockImpl(requestHeaders);
256         mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush;
257         mPendingData = new LinkedList<>();
258         mFlushData = new LinkedList<>();
259         mRequestAnnotations = requestAnnotations;
260         mTrafficStatsTagSet = trafficStatsTagSet;
261         mTrafficStatsTag = trafficStatsTag;
262         mTrafficStatsUidSet = trafficStatsUidSet;
263         mTrafficStatsUid = trafficStatsUid;
264         mNetworkHandle = networkHandle;
265     }
266 
267     @Override
getHttpMethod()268     public String getHttpMethod() {
269         return mInitialMethod;
270     }
271 
272     @Override
hasTrafficStatsTag()273     public boolean hasTrafficStatsTag() {
274         return mTrafficStatsTagSet;
275     }
276 
277     @Override
getTrafficStatsTag()278     public int getTrafficStatsTag() {
279         if (!hasTrafficStatsTag()) {
280             throw new IllegalStateException("TrafficStatsTag is not set");
281         }
282         return mTrafficStatsTag;
283     }
284 
285     @Override
hasTrafficStatsUid()286     public boolean hasTrafficStatsUid() {
287         return mTrafficStatsUidSet;
288     }
289 
290     @Override
getTrafficStatsUid()291     public int getTrafficStatsUid() {
292         if (!hasTrafficStatsUid()) {
293             throw new IllegalStateException("TrafficStatsUid is not set");
294         }
295         return mTrafficStatsUid;
296     }
297 
298     @Override
getHeaders()299     public HeaderBlock getHeaders() {
300         return mRequestHeaderBlock;
301     }
302 
303     @Override
getPriority()304     public int getPriority() {
305         switch (mInitialPriority) {
306             case RequestPriority.IDLE:
307                 return STREAM_PRIORITY_IDLE;
308             case RequestPriority.LOWEST:
309                 return STREAM_PRIORITY_LOWEST;
310             case RequestPriority.LOW:
311                 return STREAM_PRIORITY_LOW;
312             case RequestPriority.MEDIUM:
313                 return STREAM_PRIORITY_MEDIUM;
314             case RequestPriority.HIGHEST:
315                 return STREAM_PRIORITY_HIGHEST;
316             default:
317                 throw new IllegalStateException("Invalid stream priority: " + mInitialPriority);
318         }
319     }
320 
321     @Override
isDelayRequestHeadersUntilFirstFlushEnabled()322     public boolean isDelayRequestHeadersUntilFirstFlushEnabled() {
323         return mDelayRequestHeadersUntilFirstFlush;
324     }
325 
326     @Override
start()327     public void start() {
328         synchronized (mNativeStreamLock) {
329             if (mReadState != State.NOT_STARTED) {
330                 throw new IllegalStateException("Stream is already started.");
331             }
332             try {
333                 mNativeStream = CronetBidirectionalStreamJni.get().createBidirectionalStream(
334                         CronetBidirectionalStream.this,
335                         mRequestContext.getUrlRequestContextAdapter(),
336                         !mDelayRequestHeadersUntilFirstFlush, mTrafficStatsTagSet, mTrafficStatsTag,
337                         mTrafficStatsUidSet, mTrafficStatsUid, mNetworkHandle);
338                 mRequestContext.onRequestStarted();
339                 // Non-zero startResult means an argument error.
340                 int startResult = CronetBidirectionalStreamJni.get().start(mNativeStream,
341                         CronetBidirectionalStream.this, mInitialUrl, mInitialPriority,
342                         mInitialMethod, mRequestHeaders, !doesMethodAllowWriteData(mInitialMethod));
343                 if (startResult == -1) {
344                     throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
345                 }
346                 if (startResult > 0) {
347                     int headerPos = startResult - 1;
348                     throw new IllegalArgumentException("Invalid header "
349                             + mRequestHeaders[headerPos] + "=" + mRequestHeaders[headerPos + 1]);
350                 }
351                 mReadState = mWriteState = State.STARTED;
352             } catch (RuntimeException e) {
353                 // If there's an exception, clean up and then throw the
354                 // exception to the caller.
355                 destroyNativeStreamLocked(false);
356                 throw e;
357             }
358         }
359     }
360 
361     @Override
read(ByteBuffer buffer)362     public void read(ByteBuffer buffer) {
363         synchronized (mNativeStreamLock) {
364             Preconditions.checkHasRemaining(buffer);
365             Preconditions.checkDirect(buffer);
366             if (mReadState != State.WAITING_FOR_READ) {
367                 throw new IllegalStateException("Unexpected read attempt.");
368             }
369             if (isDoneLocked()) {
370                 return;
371             }
372             if (mOnReadCompletedTask == null) {
373                 mOnReadCompletedTask = new OnReadCompletedRunnable();
374             }
375             mReadState = State.READING;
376             if (!CronetBidirectionalStreamJni.get().readData(mNativeStream,
377                         CronetBidirectionalStream.this, buffer, buffer.position(),
378                         buffer.limit())) {
379                 // Still waiting on read. This is just to have consistent
380                 // behavior with the other error cases.
381                 mReadState = State.WAITING_FOR_READ;
382                 throw new IllegalArgumentException("Unable to call native read");
383             }
384         }
385     }
386 
387     @Override
write(ByteBuffer buffer, boolean endOfStream)388     public void write(ByteBuffer buffer, boolean endOfStream) {
389         synchronized (mNativeStreamLock) {
390             Preconditions.checkDirect(buffer);
391             if (!buffer.hasRemaining() && !endOfStream) {
392                 throw new IllegalArgumentException("Empty buffer before end of stream.");
393             }
394             if (mEndOfStreamWritten) {
395                 throw new IllegalArgumentException("Write after writing end of stream.");
396             }
397             if (isDoneLocked()) {
398                 return;
399             }
400             mPendingData.add(buffer);
401             if (endOfStream) {
402                 mEndOfStreamWritten = true;
403             }
404         }
405     }
406 
407     @Override
flush()408     public void flush() {
409         synchronized (mNativeStreamLock) {
410             if (isDoneLocked()
411                     || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != State.WRITING)) {
412                 return;
413             }
414             if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
415                 // If there is no pending write when flush() is called, see if
416                 // request headers need to be flushed.
417                 if (!mRequestHeadersSent) {
418                     mRequestHeadersSent = true;
419                     CronetBidirectionalStreamJni.get().sendRequestHeaders(
420                             mNativeStream, CronetBidirectionalStream.this);
421                     if (!doesMethodAllowWriteData(mInitialMethod)) {
422                         mWriteState = State.WRITING_DONE;
423                     }
424                 }
425                 return;
426             }
427 
428             assert !mPendingData.isEmpty() || !mFlushData.isEmpty();
429 
430             // Move buffers from mPendingData to the flushing queue.
431             if (!mPendingData.isEmpty()) {
432                 mFlushData.addAll(mPendingData);
433                 mPendingData.clear();
434             }
435 
436             if (mWriteState == State.WRITING) {
437                 // If there is a write already pending, wait until onWritevCompleted is
438                 // called before pushing data to the native stack.
439                 return;
440             }
441             sendFlushDataLocked();
442         }
443     }
444 
445     // Helper method to send buffers in mFlushData. Caller needs to acquire
446     // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and
447     // mFlushData queue isn't empty.
448     @SuppressWarnings("GuardedByChecker")
sendFlushDataLocked()449     private void sendFlushDataLocked() {
450         assert mWriteState == State.WAITING_FOR_FLUSH;
451         int size = mFlushData.size();
452         ByteBuffer[] buffers = new ByteBuffer[size];
453         int[] positions = new int[size];
454         int[] limits = new int[size];
455         for (int i = 0; i < size; i++) {
456             ByteBuffer buffer = mFlushData.poll();
457             buffers[i] = buffer;
458             positions[i] = buffer.position();
459             limits[i] = buffer.limit();
460         }
461         assert mFlushData.isEmpty();
462         assert buffers.length >= 1;
463         mWriteState = State.WRITING;
464         mRequestHeadersSent = true;
465         if (!CronetBidirectionalStreamJni.get().writevData(mNativeStream,
466                     CronetBidirectionalStream.this, buffers, positions, limits,
467                     mEndOfStreamWritten && mPendingData.isEmpty())) {
468             // Still waiting on flush. This is just to have consistent
469             // behavior with the other error cases.
470             mWriteState = State.WAITING_FOR_FLUSH;
471             throw new IllegalArgumentException("Unable to call native writev.");
472         }
473     }
474 
475     /**
476      * Returns a read-only copy of {@code mPendingData} for testing.
477      */
478     @VisibleForTesting
getPendingDataForTesting()479     public List<ByteBuffer> getPendingDataForTesting() {
480         synchronized (mNativeStreamLock) {
481             List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>();
482             for (ByteBuffer buffer : mPendingData) {
483                 pendingData.add(buffer.asReadOnlyBuffer());
484             }
485             return pendingData;
486         }
487     }
488 
489     /**
490      * Returns a read-only copy of {@code mFlushData} for testing.
491      */
492     @VisibleForTesting
getFlushDataForTesting()493     public List<ByteBuffer> getFlushDataForTesting() {
494         synchronized (mNativeStreamLock) {
495             List<ByteBuffer> flushData = new LinkedList<ByteBuffer>();
496             for (ByteBuffer buffer : mFlushData) {
497                 flushData.add(buffer.asReadOnlyBuffer());
498             }
499             return flushData;
500         }
501     }
502 
503     @Override
cancel()504     public void cancel() {
505         synchronized (mNativeStreamLock) {
506             if (isDoneLocked() || mReadState == State.NOT_STARTED) {
507                 return;
508             }
509             mReadState = mWriteState = State.CANCELED;
510             destroyNativeStreamLocked(true);
511         }
512     }
513 
514     @Override
isDone()515     public boolean isDone() {
516         synchronized (mNativeStreamLock) {
517             return isDoneLocked();
518         }
519     }
520 
521     @GuardedBy("mNativeStreamLock")
isDoneLocked()522     private boolean isDoneLocked() {
523         return mReadState != State.NOT_STARTED && mNativeStream == 0;
524     }
525 
526     /*
527      * Runs an onSucceeded callback if both Read and Write sides are closed.
528      */
maybeOnSucceededOnExecutor()529     private void maybeOnSucceededOnExecutor() {
530         synchronized (mNativeStreamLock) {
531             if (isDoneLocked()) {
532                 return;
533             }
534             if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) {
535                 return;
536             }
537             mReadState = mWriteState = State.SUCCESS;
538             // Destroy native stream first, so UrlRequestContext could be shut
539             // down from the listener.
540             destroyNativeStreamLocked(false);
541         }
542         try {
543             mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
544         } catch (Exception e) {
545             Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
546         }
547     }
548 
549     @SuppressWarnings("unused")
550     @CalledByNative
onStreamReady(final boolean requestHeadersSent)551     private void onStreamReady(final boolean requestHeadersSent) {
552         postTaskToExecutor(new Runnable() {
553             @Override
554             public void run() {
555                 synchronized (mNativeStreamLock) {
556                     if (isDoneLocked()) {
557                         return;
558                     }
559                     mRequestHeadersSent = requestHeadersSent;
560                     mReadState = State.WAITING_FOR_READ;
561                     if (!doesMethodAllowWriteData(mInitialMethod) && mRequestHeadersSent) {
562                         mWriteState = State.WRITING_DONE;
563                     } else {
564                         mWriteState = State.WAITING_FOR_FLUSH;
565                     }
566                 }
567 
568                 try {
569                     mCallback.onStreamReady(CronetBidirectionalStream.this);
570                 } catch (Exception e) {
571                     onCallbackException(e);
572                 }
573             }
574         });
575     }
576 
577     /**
578      * Called when the final set of headers, after all redirects,
579      * is received. Can only be called once for each stream.
580      */
581     @SuppressWarnings("unused")
582     @CalledByNative
onResponseHeadersReceived(int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)583     private void onResponseHeadersReceived(int httpStatusCode, String negotiatedProtocol,
584             String[] headers, long receivedByteCount) {
585         try {
586             mResponseInfo = prepareResponseInfoOnNetworkThread(
587                     httpStatusCode, negotiatedProtocol, headers, receivedByteCount);
588         } catch (Exception e) {
589             failWithException(new CronetExceptionImpl("Cannot prepare ResponseInfo", null));
590             return;
591         }
592         postTaskToExecutor(new Runnable() {
593             @Override
594             public void run() {
595                 synchronized (mNativeStreamLock) {
596                     if (isDoneLocked()) {
597                         return;
598                     }
599                     mReadState = State.WAITING_FOR_READ;
600                 }
601 
602                 try {
603                     mCallback.onResponseHeadersReceived(
604                             CronetBidirectionalStream.this, mResponseInfo);
605                 } catch (Exception e) {
606                     onCallbackException(e);
607                 }
608             }
609         });
610     }
611 
612     @SuppressWarnings("unused")
613     @CalledByNative
onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition, int initialLimit, long receivedByteCount)614     private void onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition,
615             int initialLimit, long receivedByteCount) {
616         mResponseInfo.setReceivedByteCount(receivedByteCount);
617         if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) {
618             failWithException(
619                     new CronetExceptionImpl("ByteBuffer modified externally during read", null));
620             return;
621         }
622         if (bytesRead < 0 || initialPosition + bytesRead > initialLimit) {
623             failWithException(new CronetExceptionImpl("Invalid number of bytes read", null));
624             return;
625         }
626         byteBuffer.position(initialPosition + bytesRead);
627         assert mOnReadCompletedTask.mByteBuffer == null;
628         mOnReadCompletedTask.mByteBuffer = byteBuffer;
629         mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
630         postTaskToExecutor(mOnReadCompletedTask);
631     }
632 
633     @SuppressWarnings("unused")
634     @CalledByNative
onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, int[] initialLimits, boolean endOfStream)635     private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions,
636             int[] initialLimits, boolean endOfStream) {
637         assert byteBuffers.length == initialPositions.length;
638         assert byteBuffers.length == initialLimits.length;
639         synchronized (mNativeStreamLock) {
640             if (isDoneLocked()) return;
641             mWriteState = State.WAITING_FOR_FLUSH;
642             // Flush if there is anything in the flush queue mFlushData.
643             if (!mFlushData.isEmpty()) {
644                 sendFlushDataLocked();
645             }
646         }
647         for (int i = 0; i < byteBuffers.length; i++) {
648             ByteBuffer buffer = byteBuffers[i];
649             if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) {
650                 failWithException(new CronetExceptionImpl(
651                         "ByteBuffer modified externally during write", null));
652                 return;
653             }
654             // Current implementation always writes the complete buffer.
655             buffer.position(buffer.limit());
656             postTaskToExecutor(new OnWriteCompletedRunnable(buffer,
657                     // Only set endOfStream flag if this buffer is the last in byteBuffers.
658                     endOfStream && i == byteBuffers.length - 1));
659         }
660     }
661 
662     @SuppressWarnings("unused")
663     @CalledByNative
onResponseTrailersReceived(String[] trailers)664     private void onResponseTrailersReceived(String[] trailers) {
665         final HeaderBlock trailersBlock = new HeaderBlockImpl(headersListFromStrings(trailers));
666         postTaskToExecutor(new Runnable() {
667             @Override
668             public void run() {
669                 synchronized (mNativeStreamLock) {
670                     if (isDoneLocked()) {
671                         return;
672                     }
673                 }
674                 try {
675                     mCallback.onResponseTrailersReceived(
676                             CronetBidirectionalStream.this, mResponseInfo, trailersBlock);
677                 } catch (Exception e) {
678                     onCallbackException(e);
679                 }
680             }
681         });
682     }
683 
684     @SuppressWarnings("unused")
685     @CalledByNative
onError(int errorCode, int nativeError, int nativeQuicError, String errorString, long receivedByteCount)686     private void onError(int errorCode, int nativeError, int nativeQuicError, String errorString,
687             long receivedByteCount) {
688         if (mResponseInfo != null) {
689             mResponseInfo.setReceivedByteCount(receivedByteCount);
690         }
691         if (errorCode == NetworkException.ERROR_QUIC_PROTOCOL_FAILED
692                 || errorCode == NetworkException.ERROR_NETWORK_CHANGED) {
693             failWithException(
694                     new QuicExceptionImpl("Exception in BidirectionalStream: " + errorString,
695                             errorCode, nativeError, nativeQuicError));
696         } else {
697             failWithException(new BidirectionalStreamNetworkException(
698                     "Exception in BidirectionalStream: " + errorString, errorCode, nativeError));
699         }
700     }
701 
702     /**
703      * Called when request is canceled, no callbacks will be called afterwards.
704      */
705     @SuppressWarnings("unused")
706     @CalledByNative
onCanceled()707     private void onCanceled() {
708         postTaskToExecutor(new Runnable() {
709             @Override
710             public void run() {
711                 try {
712                     mCallback.onCanceled(CronetBidirectionalStream.this, mResponseInfo);
713                 } catch (Exception e) {
714                     Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCanceled method", e);
715                 }
716             }
717         });
718     }
719 
720     /**
721     * Called by the native code to report metrics just before the native adapter is destroyed.
722     */
723     @SuppressWarnings("unused")
724     @CalledByNative
onMetricsCollected(long requestStartMs, long dnsStartMs, long dnsEndMs, long connectStartMs, long connectEndMs, long sslStartMs, long sslEndMs, long sendingStartMs, long sendingEndMs, long pushStartMs, long pushEndMs, long responseStartMs, long requestEndMs, boolean socketReused, long sentByteCount, long receivedByteCount)725     private void onMetricsCollected(long requestStartMs, long dnsStartMs, long dnsEndMs,
726             long connectStartMs, long connectEndMs, long sslStartMs, long sslEndMs,
727             long sendingStartMs, long sendingEndMs, long pushStartMs, long pushEndMs,
728             long responseStartMs, long requestEndMs, boolean socketReused, long sentByteCount,
729             long receivedByteCount) {
730         synchronized (mNativeStreamLock) {
731             if (mMetrics != null) {
732                 throw new IllegalStateException("Metrics collection should only happen once.");
733             }
734             mMetrics = new CronetMetrics(requestStartMs, dnsStartMs, dnsEndMs, connectStartMs,
735                     connectEndMs, sslStartMs, sslEndMs, sendingStartMs, sendingEndMs, pushStartMs,
736                     pushEndMs, responseStartMs, requestEndMs, socketReused, sentByteCount,
737                     receivedByteCount);
738             assert mReadState == mWriteState;
739             assert (mReadState == State.SUCCESS) || (mReadState == State.ERROR)
740                     || (mReadState == State.CANCELED);
741             int finishedReason;
742             if (mReadState == State.SUCCESS) {
743                 finishedReason = RequestFinishedInfo.SUCCEEDED;
744             } else if (mReadState == State.CANCELED) {
745                 finishedReason = RequestFinishedInfo.CANCELED;
746             } else {
747                 finishedReason = RequestFinishedInfo.FAILED;
748             }
749             final RequestFinishedInfo requestFinishedInfo = new RequestFinishedInfoImpl(mInitialUrl,
750                     mRequestAnnotations, mMetrics, finishedReason, mResponseInfo, mException);
751             mRequestContext.reportRequestFinished(requestFinishedInfo);
752         }
753     }
754 
755     @VisibleForTesting
setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting)756     public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) {
757         mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting;
758     }
759 
doesMethodAllowWriteData(String methodName)760     private static boolean doesMethodAllowWriteData(String methodName) {
761         return !methodName.equals("GET") && !methodName.equals("HEAD");
762     }
763 
headersListFromStrings(String[] headers)764     private static ArrayList<Map.Entry<String, String>> headersListFromStrings(String[] headers) {
765         ArrayList<Map.Entry<String, String>> headersList = new ArrayList<>(headers.length / 2);
766         for (int i = 0; i < headers.length; i += 2) {
767             headersList.add(new AbstractMap.SimpleImmutableEntry<>(headers[i], headers[i + 1]));
768         }
769         return headersList;
770     }
771 
stringsFromHeaderList(List<Map.Entry<String, String>> headersList)772     private static String[] stringsFromHeaderList(List<Map.Entry<String, String>> headersList) {
773         String headersArray[] = new String[headersList.size() * 2];
774         int i = 0;
775         for (Map.Entry<String, String> requestHeader : headersList) {
776             headersArray[i++] = requestHeader.getKey();
777             headersArray[i++] = requestHeader.getValue();
778         }
779         return headersArray;
780     }
781 
convertStreamPriority(@ronetEngineBase.StreamPriority int priority)782     private static int convertStreamPriority(@CronetEngineBase.StreamPriority int priority) {
783         switch (priority) {
784             case STREAM_PRIORITY_IDLE:
785                 return RequestPriority.IDLE;
786             case STREAM_PRIORITY_LOWEST:
787                 return RequestPriority.LOWEST;
788             case STREAM_PRIORITY_LOW:
789                 return RequestPriority.LOW;
790             case STREAM_PRIORITY_MEDIUM:
791                 return RequestPriority.MEDIUM;
792             case STREAM_PRIORITY_HIGHEST:
793                 return RequestPriority.HIGHEST;
794             default:
795                 throw new IllegalArgumentException("Invalid stream priority.");
796         }
797     }
798 
799     /**
800      * Posts task to application Executor. Used for callbacks
801      * and other tasks that should not be executed on network thread.
802      */
postTaskToExecutor(Runnable task)803     private void postTaskToExecutor(Runnable task) {
804         try {
805             mExecutor.execute(task);
806         } catch (RejectedExecutionException failException) {
807             Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to executor",
808                     failException);
809             // If posting a task throws an exception, then there is no choice
810             // but to destroy the stream without invoking the callback.
811             synchronized (mNativeStreamLock) {
812                 mReadState = mWriteState = State.ERROR;
813                 destroyNativeStreamLocked(false);
814             }
815         }
816     }
817 
prepareResponseInfoOnNetworkThread(int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)818     private UrlResponseInfoImpl prepareResponseInfoOnNetworkThread(int httpStatusCode,
819             String negotiatedProtocol, String[] headers, long receivedByteCount) {
820         UrlResponseInfoImpl responseInfo = new UrlResponseInfoImpl(Arrays.asList(mInitialUrl),
821                 httpStatusCode, "", headersListFromStrings(headers), false, negotiatedProtocol,
822                 null, receivedByteCount);
823         return responseInfo;
824     }
825 
826     @GuardedBy("mNativeStreamLock")
destroyNativeStreamLocked(boolean sendOnCanceled)827     private void destroyNativeStreamLocked(boolean sendOnCanceled) {
828         Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + this.toString());
829         if (mNativeStream == 0) {
830             return;
831         }
832         CronetBidirectionalStreamJni.get().destroy(
833                 mNativeStream, CronetBidirectionalStream.this, sendOnCanceled);
834         mRequestContext.onRequestDestroyed();
835         mNativeStream = 0;
836         if (mOnDestroyedCallbackForTesting != null) {
837             mOnDestroyedCallbackForTesting.run();
838         }
839     }
840 
841     /**
842      * Fails the stream with an exception. Only called on the Executor.
843      */
failWithExceptionOnExecutor(HttpException e)844     private void failWithExceptionOnExecutor(HttpException e) {
845         mException = e;
846         // Do not call into mCallback if request is complete.
847         synchronized (mNativeStreamLock) {
848             if (isDoneLocked()) {
849                 return;
850             }
851             mReadState = mWriteState = State.ERROR;
852             destroyNativeStreamLocked(false);
853         }
854         try {
855             mCallback.onFailed(this, mResponseInfo, e);
856         } catch (Exception failException) {
857             Log.e(CronetUrlRequestContext.LOG_TAG, "Exception notifying of failed request",
858                     failException);
859         }
860     }
861 
862     /**
863      * If callback method throws an exception, stream gets canceled
864      * and exception is reported via onFailed callback.
865      * Only called on the Executor.
866      */
onCallbackException(Exception e)867     private void onCallbackException(Exception e) {
868         CallbackException streamError =
869                 new CallbackExceptionImpl("CalledByNative method has thrown an exception", e);
870         Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative method", e);
871         failWithExceptionOnExecutor(streamError);
872     }
873 
874     /**
875      * Fails the stream with an exception. Can be called on any thread.
876      */
failWithException(final HttpException exception)877     private void failWithException(final HttpException exception) {
878         postTaskToExecutor(new Runnable() {
879             @Override
880             public void run() {
881                 failWithExceptionOnExecutor(exception);
882             }
883         });
884     }
885 
886     @NativeMethods
887     interface Natives {
888         // Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
createBidirectionalStream(CronetBidirectionalStream caller, long urlRequestContextAdapter, boolean sendRequestHeadersAutomatically, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)889         long createBidirectionalStream(CronetBidirectionalStream caller,
890                 long urlRequestContextAdapter, boolean sendRequestHeadersAutomatically,
891                 boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet,
892                 int trafficStatsUid, long networkHandle);
893 
894         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
start(long nativePtr, CronetBidirectionalStream caller, String url, int priority, String method, String[] headers, boolean endOfStream)895         int start(long nativePtr, CronetBidirectionalStream caller, String url, int priority,
896                 String method, String[] headers, boolean endOfStream);
897 
898         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller)899         void sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller);
900 
901         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
readData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer byteBuffer, int position, int limit)902         boolean readData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer byteBuffer,
903                 int position, int limit);
904 
905         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
writevData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer[] buffers, int[] positions, int[] limits, boolean endOfStream)906         boolean writevData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer[] buffers,
907                 int[] positions, int[] limits, boolean endOfStream);
908 
909         @NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled)910         void destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled);
911     }
912 }
913