1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.impl; 6 7 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_IDLE; 8 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_LOWEST; 9 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_LOW; 10 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_MEDIUM; 11 import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_HIGHEST; 12 13 import androidx.annotation.IntDef; 14 import androidx.annotation.VisibleForTesting; 15 16 import org.jni_zero.CalledByNative; 17 import org.jni_zero.JNINamespace; 18 import org.jni_zero.NativeClassQualifiedName; 19 import org.jni_zero.NativeMethods; 20 21 import org.chromium.base.Log; 22 import org.chromium.net.BidirectionalStream; 23 import org.chromium.net.CallbackException; 24 import org.chromium.net.CronetException; 25 import org.chromium.net.ExperimentalBidirectionalStream; 26 import org.chromium.net.NetworkException; 27 import org.chromium.net.RequestFinishedInfo; 28 import org.chromium.net.RequestPriority; 29 import org.chromium.net.UrlResponseInfo; 30 31 import java.lang.annotation.Retention; 32 import java.lang.annotation.RetentionPolicy; 33 import java.nio.ByteBuffer; 34 import java.util.AbstractMap; 35 import java.util.ArrayList; 36 import java.util.Arrays; 37 import java.util.Collection; 38 import java.util.LinkedList; 39 import java.util.List; 40 import java.util.Map; 41 import java.util.concurrent.Executor; 42 import java.util.concurrent.RejectedExecutionException; 43 44 import javax.annotation.concurrent.GuardedBy; 45 46 /** 47 * {@link BidirectionalStream} implementation using Chromium network stack. 48 * All @CalledByNative methods are called on the native network thread 49 * and post tasks with callback calls onto Executor. Upon returning from callback, the native 50 * stream is called on Executor thread and posts native tasks to the native network thread. 51 */ 52 @JNINamespace("cronet") 53 @VisibleForTesting 54 public class CronetBidirectionalStream extends ExperimentalBidirectionalStream { 55 /** 56 * States of BidirectionalStream are tracked in mReadState and mWriteState. 57 * The write state is separated out as it changes independently of the read state. 58 * There is one initial state: State.NOT_STARTED. There is one normal final state: 59 * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. There are two 60 * exceptional final states: State.CANCELED and State.ERROR, which can be reached from 61 * any other non-final state. 62 */ 63 @IntDef({ 64 State.NOT_STARTED, 65 State.STARTED, 66 State.WAITING_FOR_READ, 67 State.READING, 68 State.READING_DONE, 69 State.CANCELED, 70 State.ERROR, 71 State.SUCCESS, 72 State.WAITING_FOR_FLUSH, 73 State.WRITING, 74 State.WRITING_DONE 75 }) 76 @Retention(RetentionPolicy.SOURCE) 77 private @interface State { 78 /* Initial state, stream not started. */ 79 int NOT_STARTED = 0; 80 /* 81 * Stream started, request headers are being sent if mDelayRequestHeadersUntilNextFlush 82 * is not set to true. 83 */ 84 int STARTED = 1; 85 /* Waiting for {@code read()} to be called. */ 86 int WAITING_FOR_READ = 2; 87 /* Reading from the remote, {@code onReadCompleted()} callback will be called when done. */ 88 int READING = 3; 89 /* There is no more data to read and stream is half-closed by the remote side. */ 90 int READING_DONE = 4; 91 /* Stream is canceled. */ 92 int CANCELED = 5; 93 /* Error has occurred, stream is closed. */ 94 int ERROR = 6; 95 /* Reading and writing are done, and the stream is closed successfully. */ 96 int SUCCESS = 7; 97 /* Waiting for {@code CronetBidirectionalStreamJni.get().sendRequestHeaders()} or {@code 98 CronetBidirectionalStreamJni.get().writevData()} to be called. */ 99 int WAITING_FOR_FLUSH = 8; 100 /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */ 101 int WRITING = 9; 102 /* There is no more data to write and stream is half-closed by the local side. */ 103 int WRITING_DONE = 10; 104 } 105 106 private final CronetUrlRequestContext mRequestContext; 107 private final Executor mExecutor; 108 private final VersionSafeCallbacks.BidirectionalStreamCallback mCallback; 109 private final String mInitialUrl; 110 private final int mInitialPriority; 111 private final String mInitialMethod; 112 private final String mRequestHeaders[]; 113 private final UrlResponseInfo.HeaderBlock mRequestHeaderBlock; 114 private final boolean mDelayRequestHeadersUntilFirstFlush; 115 private final Collection<Object> mRequestAnnotations; 116 private final boolean mTrafficStatsTagSet; 117 private final int mTrafficStatsTag; 118 private final boolean mTrafficStatsUidSet; 119 private final int mTrafficStatsUid; 120 private final long mNetworkHandle; 121 private RefCountDelegate mInflightDoneCallbackCount; 122 private CronetException mException; 123 124 /* 125 * Synchronizes access to mNativeStream, mReadState and mWriteState. 126 */ 127 private final Object mNativeStreamLock = new Object(); 128 129 @GuardedBy("mNativeStreamLock") 130 // Pending write data. 131 private LinkedList<ByteBuffer> mPendingData; 132 133 @GuardedBy("mNativeStreamLock") 134 // Flush data queue that should be pushed to the native stack when the previous 135 // CronetBidirectionalStreamJni.get().writevData completes. 136 private LinkedList<ByteBuffer> mFlushData; 137 138 @GuardedBy("mNativeStreamLock") 139 // Whether an end-of-stream flag is passed in through write(). 140 private boolean mEndOfStreamWritten; 141 142 @GuardedBy("mNativeStreamLock") 143 // Whether request headers have been sent. 144 private boolean mRequestHeadersSent; 145 146 @GuardedBy("mNativeStreamLock") 147 // Metrics information. Obtained when request succeeds, fails or is canceled. 148 private RequestFinishedInfo.Metrics mMetrics; 149 150 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ 151 @GuardedBy("mNativeStreamLock") 152 private long mNativeStream; 153 154 /** 155 * Read state is tracking reading flow. 156 * / <--- READING <--- \ 157 * | | 158 * \ / 159 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS 160 */ 161 @GuardedBy("mNativeStreamLock") 162 private @State int mReadState = State.NOT_STARTED; 163 164 /** 165 * Write state is tracking writing flow. 166 * / <--- WRITING <--- \ 167 * | | 168 * \ / 169 * NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS 170 */ 171 @GuardedBy("mNativeStreamLock") 172 private @State int mWriteState = State.NOT_STARTED; 173 174 // Only modified on the network thread. 175 private UrlResponseInfoImpl mResponseInfo; 176 177 /* 178 * OnReadCompleted callback is repeatedly invoked when each read is completed, so it 179 * is cached as a member variable. 180 */ 181 // Only modified on the network thread. 182 private OnReadCompletedRunnable mOnReadCompletedTask; 183 184 private Runnable mOnDestroyedCallbackForTesting; 185 186 private final class OnReadCompletedRunnable implements Runnable { 187 // Buffer passed back from current invocation of onReadCompleted. 188 ByteBuffer mByteBuffer; 189 // End of stream flag from current invocation of onReadCompleted. 190 boolean mEndOfStream; 191 192 @Override run()193 public void run() { 194 try { 195 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 196 ByteBuffer buffer = mByteBuffer; 197 mByteBuffer = null; 198 boolean maybeOnSucceeded = false; 199 synchronized (mNativeStreamLock) { 200 if (isDoneLocked()) { 201 return; 202 } 203 if (mEndOfStream) { 204 mReadState = State.READING_DONE; 205 maybeOnSucceeded = (mWriteState == State.WRITING_DONE); 206 } else { 207 mReadState = State.WAITING_FOR_READ; 208 } 209 } 210 mCallback.onReadCompleted( 211 CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); 212 if (maybeOnSucceeded) { 213 maybeOnSucceededOnExecutor(); 214 } 215 } catch (Exception e) { 216 onCallbackException(e); 217 } 218 } 219 } 220 221 private final class OnWriteCompletedRunnable implements Runnable { 222 // Buffer passed back from current invocation of onWriteCompleted. 223 private ByteBuffer mByteBuffer; 224 // End of stream flag from current call to write. 225 private final boolean mEndOfStream; 226 OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream)227 OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) { 228 mByteBuffer = buffer; 229 mEndOfStream = endOfStream; 230 } 231 232 @Override run()233 public void run() { 234 try { 235 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 236 ByteBuffer buffer = mByteBuffer; 237 mByteBuffer = null; 238 boolean maybeOnSucceeded = false; 239 synchronized (mNativeStreamLock) { 240 if (isDoneLocked()) { 241 return; 242 } 243 if (mEndOfStream) { 244 mWriteState = State.WRITING_DONE; 245 maybeOnSucceeded = (mReadState == State.READING_DONE); 246 } 247 } 248 mCallback.onWriteCompleted( 249 CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); 250 if (maybeOnSucceeded) { 251 maybeOnSucceededOnExecutor(); 252 } 253 } catch (Exception e) { 254 onCallbackException(e); 255 } 256 } 257 } 258 CronetBidirectionalStream( CronetUrlRequestContext requestContext, String url, @CronetEngineBase.StreamPriority int priority, Callback callback, Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, boolean delayRequestHeadersUntilNextFlush, Collection<Object> requestAnnotations, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)259 CronetBidirectionalStream( 260 CronetUrlRequestContext requestContext, 261 String url, 262 @CronetEngineBase.StreamPriority int priority, 263 Callback callback, 264 Executor executor, 265 String httpMethod, 266 List<Map.Entry<String, String>> requestHeaders, 267 boolean delayRequestHeadersUntilNextFlush, 268 Collection<Object> requestAnnotations, 269 boolean trafficStatsTagSet, 270 int trafficStatsTag, 271 boolean trafficStatsUidSet, 272 int trafficStatsUid, 273 long networkHandle) { 274 mRequestContext = requestContext; 275 mInitialUrl = url; 276 mInitialPriority = convertStreamPriority(priority); 277 mCallback = new VersionSafeCallbacks.BidirectionalStreamCallback(callback); 278 mExecutor = executor; 279 mInitialMethod = httpMethod; 280 mRequestHeaders = stringsFromHeaderList(requestHeaders); 281 mRequestHeaderBlock = new UrlResponseInfoImpl.HeaderBlockImpl(requestHeaders); 282 mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush; 283 mPendingData = new LinkedList<>(); 284 mFlushData = new LinkedList<>(); 285 mRequestAnnotations = requestAnnotations; 286 mTrafficStatsTagSet = trafficStatsTagSet; 287 mTrafficStatsTag = trafficStatsTag; 288 mTrafficStatsUidSet = trafficStatsUidSet; 289 mTrafficStatsUid = trafficStatsUid; 290 mNetworkHandle = networkHandle; 291 } 292 293 @Override getHttpMethod()294 public String getHttpMethod() { 295 return mInitialMethod; 296 } 297 298 @Override hasTrafficStatsTag()299 public boolean hasTrafficStatsTag() { 300 return mTrafficStatsTagSet; 301 } 302 303 @Override getTrafficStatsTag()304 public int getTrafficStatsTag() { 305 if (!hasTrafficStatsTag()) { 306 throw new IllegalStateException("TrafficStatsTag is not set"); 307 } 308 return mTrafficStatsTag; 309 } 310 311 @Override hasTrafficStatsUid()312 public boolean hasTrafficStatsUid() { 313 return mTrafficStatsUidSet; 314 } 315 316 @Override getTrafficStatsUid()317 public int getTrafficStatsUid() { 318 if (!hasTrafficStatsUid()) { 319 throw new IllegalStateException("TrafficStatsUid is not set"); 320 } 321 return mTrafficStatsUid; 322 } 323 324 @Override getHeaders()325 public UrlResponseInfo.HeaderBlock getHeaders() { 326 return mRequestHeaderBlock; 327 } 328 329 @Override getPriority()330 public int getPriority() { 331 switch (mInitialPriority) { 332 case RequestPriority.IDLE: 333 return STREAM_PRIORITY_IDLE; 334 case RequestPriority.LOWEST: 335 return STREAM_PRIORITY_LOWEST; 336 case RequestPriority.LOW: 337 return STREAM_PRIORITY_LOW; 338 case RequestPriority.MEDIUM: 339 return STREAM_PRIORITY_MEDIUM; 340 case RequestPriority.HIGHEST: 341 return STREAM_PRIORITY_HIGHEST; 342 default: 343 throw new IllegalStateException("Invalid stream priority: " + mInitialPriority); 344 } 345 } 346 347 @Override isDelayRequestHeadersUntilFirstFlushEnabled()348 public boolean isDelayRequestHeadersUntilFirstFlushEnabled() { 349 return mDelayRequestHeadersUntilFirstFlush; 350 } 351 352 @Override start()353 public void start() { 354 synchronized (mNativeStreamLock) { 355 if (mReadState != State.NOT_STARTED) { 356 throw new IllegalStateException("Stream is already started."); 357 } 358 try { 359 mNativeStream = 360 CronetBidirectionalStreamJni.get() 361 .createBidirectionalStream( 362 CronetBidirectionalStream.this, 363 mRequestContext.getUrlRequestContextAdapter(), 364 !mDelayRequestHeadersUntilFirstFlush, 365 mTrafficStatsTagSet, 366 mTrafficStatsTag, 367 mTrafficStatsUidSet, 368 mTrafficStatsUid, 369 mNetworkHandle); 370 mRequestContext.onRequestStarted(); 371 mInflightDoneCallbackCount = 372 new RefCountDelegate(mRequestContext::onRequestFinished); 373 // We need an initial count of 2: one decrement for the final callback 374 // (e.g. onSucceeded), and another for onMetricsCollected(). 375 mInflightDoneCallbackCount.increment(); 376 // Non-zero startResult means an argument error. 377 int startResult = 378 CronetBidirectionalStreamJni.get() 379 .start( 380 mNativeStream, 381 CronetBidirectionalStream.this, 382 mInitialUrl, 383 mInitialPriority, 384 mInitialMethod, 385 mRequestHeaders, 386 !doesMethodAllowWriteData(mInitialMethod)); 387 if (startResult == -1) { 388 throw new IllegalArgumentException("Invalid http method " + mInitialMethod); 389 } 390 if (startResult > 0) { 391 int headerPos = startResult - 1; 392 throw new IllegalArgumentException( 393 "Invalid header with headername: " + mRequestHeaders[headerPos]); 394 } 395 mReadState = mWriteState = State.STARTED; 396 } catch (RuntimeException e) { 397 // If there's an exception, clean up and then throw the 398 // exception to the caller. 399 destroyNativeStreamLocked(false); 400 mInflightDoneCallbackCount.decrement(); 401 mInflightDoneCallbackCount.decrement(); 402 throw e; 403 } 404 } 405 } 406 407 @Override read(ByteBuffer buffer)408 public void read(ByteBuffer buffer) { 409 synchronized (mNativeStreamLock) { 410 Preconditions.checkHasRemaining(buffer); 411 Preconditions.checkDirect(buffer); 412 if (mReadState != State.WAITING_FOR_READ) { 413 throw new IllegalStateException("Unexpected read attempt."); 414 } 415 if (isDoneLocked()) { 416 return; 417 } 418 if (mOnReadCompletedTask == null) { 419 mOnReadCompletedTask = new OnReadCompletedRunnable(); 420 } 421 mReadState = State.READING; 422 if (!CronetBidirectionalStreamJni.get() 423 .readData( 424 mNativeStream, 425 CronetBidirectionalStream.this, 426 buffer, 427 buffer.position(), 428 buffer.limit())) { 429 // Still waiting on read. This is just to have consistent 430 // behavior with the other error cases. 431 mReadState = State.WAITING_FOR_READ; 432 throw new IllegalArgumentException("Unable to call native read"); 433 } 434 } 435 } 436 437 @Override write(ByteBuffer buffer, boolean endOfStream)438 public void write(ByteBuffer buffer, boolean endOfStream) { 439 synchronized (mNativeStreamLock) { 440 Preconditions.checkDirect(buffer); 441 if (!buffer.hasRemaining() && !endOfStream) { 442 throw new IllegalArgumentException("Empty buffer before end of stream."); 443 } 444 if (mEndOfStreamWritten) { 445 throw new IllegalArgumentException("Write after writing end of stream."); 446 } 447 if (isDoneLocked()) { 448 return; 449 } 450 mPendingData.add(buffer); 451 if (endOfStream) { 452 mEndOfStreamWritten = true; 453 } 454 } 455 } 456 457 @Override flush()458 public void flush() { 459 synchronized (mNativeStreamLock) { 460 if (isDoneLocked() 461 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != State.WRITING)) { 462 return; 463 } 464 if (mPendingData.isEmpty() && mFlushData.isEmpty()) { 465 // If there is no pending write when flush() is called, see if 466 // request headers need to be flushed. 467 if (!mRequestHeadersSent) { 468 mRequestHeadersSent = true; 469 CronetBidirectionalStreamJni.get() 470 .sendRequestHeaders(mNativeStream, CronetBidirectionalStream.this); 471 if (!doesMethodAllowWriteData(mInitialMethod)) { 472 mWriteState = State.WRITING_DONE; 473 } 474 } 475 return; 476 } 477 478 assert !mPendingData.isEmpty() || !mFlushData.isEmpty(); 479 480 // Move buffers from mPendingData to the flushing queue. 481 if (!mPendingData.isEmpty()) { 482 mFlushData.addAll(mPendingData); 483 mPendingData.clear(); 484 } 485 486 if (mWriteState == State.WRITING) { 487 // If there is a write already pending, wait until onWritevCompleted is 488 // called before pushing data to the native stack. 489 return; 490 } 491 sendFlushDataLocked(); 492 } 493 } 494 495 // Helper method to send buffers in mFlushData. Caller needs to acquire 496 // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and 497 // mFlushData queue isn't empty. 498 @SuppressWarnings("GuardedByChecker") sendFlushDataLocked()499 private void sendFlushDataLocked() { 500 assert mWriteState == State.WAITING_FOR_FLUSH; 501 int size = mFlushData.size(); 502 ByteBuffer[] buffers = new ByteBuffer[size]; 503 int[] positions = new int[size]; 504 int[] limits = new int[size]; 505 for (int i = 0; i < size; i++) { 506 ByteBuffer buffer = mFlushData.poll(); 507 buffers[i] = buffer; 508 positions[i] = buffer.position(); 509 limits[i] = buffer.limit(); 510 } 511 assert mFlushData.isEmpty(); 512 assert buffers.length >= 1; 513 mWriteState = State.WRITING; 514 mRequestHeadersSent = true; 515 if (!CronetBidirectionalStreamJni.get() 516 .writevData( 517 mNativeStream, 518 CronetBidirectionalStream.this, 519 buffers, 520 positions, 521 limits, 522 mEndOfStreamWritten && mPendingData.isEmpty())) { 523 // Still waiting on flush. This is just to have consistent 524 // behavior with the other error cases. 525 mWriteState = State.WAITING_FOR_FLUSH; 526 throw new IllegalArgumentException("Unable to call native writev."); 527 } 528 } 529 530 /** Returns a read-only copy of {@code mPendingData} for testing. */ getPendingDataForTesting()531 public List<ByteBuffer> getPendingDataForTesting() { 532 synchronized (mNativeStreamLock) { 533 List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>(); 534 for (ByteBuffer buffer : mPendingData) { 535 pendingData.add(buffer.asReadOnlyBuffer()); 536 } 537 return pendingData; 538 } 539 } 540 541 /** Returns a read-only copy of {@code mFlushData} for testing. */ getFlushDataForTesting()542 public List<ByteBuffer> getFlushDataForTesting() { 543 synchronized (mNativeStreamLock) { 544 List<ByteBuffer> flushData = new LinkedList<ByteBuffer>(); 545 for (ByteBuffer buffer : mFlushData) { 546 flushData.add(buffer.asReadOnlyBuffer()); 547 } 548 return flushData; 549 } 550 } 551 552 @Override cancel()553 public void cancel() { 554 synchronized (mNativeStreamLock) { 555 if (isDoneLocked() || mReadState == State.NOT_STARTED) { 556 return; 557 } 558 mReadState = mWriteState = State.CANCELED; 559 destroyNativeStreamLocked(true); 560 } 561 } 562 563 @Override isDone()564 public boolean isDone() { 565 synchronized (mNativeStreamLock) { 566 return isDoneLocked(); 567 } 568 } 569 570 @GuardedBy("mNativeStreamLock") isDoneLocked()571 private boolean isDoneLocked() { 572 return mReadState != State.NOT_STARTED && mNativeStream == 0; 573 } 574 575 /* 576 * Runs an onSucceeded callback if both Read and Write sides are closed. 577 */ maybeOnSucceededOnExecutor()578 private void maybeOnSucceededOnExecutor() { 579 synchronized (mNativeStreamLock) { 580 if (isDoneLocked()) { 581 return; 582 } 583 if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) { 584 return; 585 } 586 mReadState = mWriteState = State.SUCCESS; 587 // Destroy native stream first, so UrlRequestContext could be shut 588 // down from the listener. 589 destroyNativeStreamLocked(false); 590 } 591 try { 592 mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo); 593 } catch (Exception e) { 594 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e); 595 } 596 mInflightDoneCallbackCount.decrement(); 597 } 598 599 @SuppressWarnings("unused") 600 @CalledByNative onStreamReady(final boolean requestHeadersSent)601 private void onStreamReady(final boolean requestHeadersSent) { 602 postTaskToExecutor( 603 new Runnable() { 604 @Override 605 public void run() { 606 synchronized (mNativeStreamLock) { 607 if (isDoneLocked()) { 608 return; 609 } 610 mRequestHeadersSent = requestHeadersSent; 611 mReadState = State.WAITING_FOR_READ; 612 if (!doesMethodAllowWriteData(mInitialMethod) && mRequestHeadersSent) { 613 mWriteState = State.WRITING_DONE; 614 } else { 615 mWriteState = State.WAITING_FOR_FLUSH; 616 } 617 } 618 619 try { 620 mCallback.onStreamReady(CronetBidirectionalStream.this); 621 } catch (Exception e) { 622 onCallbackException(e); 623 } 624 } 625 }); 626 } 627 628 /** 629 * Called when the final set of headers, after all redirects, 630 * is received. Can only be called once for each stream. 631 */ 632 @SuppressWarnings("unused") 633 @CalledByNative onResponseHeadersReceived( int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)634 private void onResponseHeadersReceived( 635 int httpStatusCode, 636 String negotiatedProtocol, 637 String[] headers, 638 long receivedByteCount) { 639 try { 640 mResponseInfo = 641 prepareResponseInfoOnNetworkThread( 642 httpStatusCode, negotiatedProtocol, headers, receivedByteCount); 643 } catch (Exception e) { 644 failWithException(new CronetExceptionImpl("Cannot prepare ResponseInfo", null)); 645 return; 646 } 647 postTaskToExecutor( 648 new Runnable() { 649 @Override 650 public void run() { 651 synchronized (mNativeStreamLock) { 652 if (isDoneLocked()) { 653 return; 654 } 655 mReadState = State.WAITING_FOR_READ; 656 } 657 658 try { 659 mCallback.onResponseHeadersReceived( 660 CronetBidirectionalStream.this, mResponseInfo); 661 } catch (Exception e) { 662 onCallbackException(e); 663 } 664 } 665 }); 666 } 667 668 @SuppressWarnings("unused") 669 @CalledByNative onReadCompleted( final ByteBuffer byteBuffer, int bytesRead, int initialPosition, int initialLimit, long receivedByteCount)670 private void onReadCompleted( 671 final ByteBuffer byteBuffer, 672 int bytesRead, 673 int initialPosition, 674 int initialLimit, 675 long receivedByteCount) { 676 mResponseInfo.setReceivedByteCount(receivedByteCount); 677 if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) { 678 failWithException( 679 new CronetExceptionImpl("ByteBuffer modified externally during read", null)); 680 return; 681 } 682 if (bytesRead < 0 || initialPosition + bytesRead > initialLimit) { 683 failWithException(new CronetExceptionImpl("Invalid number of bytes read", null)); 684 return; 685 } 686 byteBuffer.position(initialPosition + bytesRead); 687 assert mOnReadCompletedTask.mByteBuffer == null; 688 mOnReadCompletedTask.mByteBuffer = byteBuffer; 689 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0); 690 postTaskToExecutor(mOnReadCompletedTask); 691 } 692 693 @SuppressWarnings("unused") 694 @CalledByNative onWritevCompleted( final ByteBuffer[] byteBuffers, int[] initialPositions, int[] initialLimits, boolean endOfStream)695 private void onWritevCompleted( 696 final ByteBuffer[] byteBuffers, 697 int[] initialPositions, 698 int[] initialLimits, 699 boolean endOfStream) { 700 assert byteBuffers.length == initialPositions.length; 701 assert byteBuffers.length == initialLimits.length; 702 synchronized (mNativeStreamLock) { 703 if (isDoneLocked()) return; 704 mWriteState = State.WAITING_FOR_FLUSH; 705 // Flush if there is anything in the flush queue mFlushData. 706 if (!mFlushData.isEmpty()) { 707 sendFlushDataLocked(); 708 } 709 } 710 for (int i = 0; i < byteBuffers.length; i++) { 711 ByteBuffer buffer = byteBuffers[i]; 712 if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) { 713 failWithException( 714 new CronetExceptionImpl( 715 "ByteBuffer modified externally during write", null)); 716 return; 717 } 718 // Current implementation always writes the complete buffer. 719 buffer.position(buffer.limit()); 720 postTaskToExecutor( 721 new OnWriteCompletedRunnable( 722 buffer, 723 // Only set endOfStream flag if this buffer is the last in byteBuffers. 724 endOfStream && i == byteBuffers.length - 1)); 725 } 726 } 727 728 @SuppressWarnings("unused") 729 @CalledByNative onResponseTrailersReceived(String[] trailers)730 private void onResponseTrailersReceived(String[] trailers) { 731 final UrlResponseInfo.HeaderBlock trailersBlock = 732 new UrlResponseInfoImpl.HeaderBlockImpl(headersListFromStrings(trailers)); 733 postTaskToExecutor( 734 new Runnable() { 735 @Override 736 public void run() { 737 synchronized (mNativeStreamLock) { 738 if (isDoneLocked()) { 739 return; 740 } 741 } 742 try { 743 mCallback.onResponseTrailersReceived( 744 CronetBidirectionalStream.this, mResponseInfo, trailersBlock); 745 } catch (Exception e) { 746 onCallbackException(e); 747 } 748 } 749 }); 750 } 751 752 @SuppressWarnings("unused") 753 @CalledByNative onError( int errorCode, int nativeError, int nativeQuicError, String errorString, long receivedByteCount)754 private void onError( 755 int errorCode, 756 int nativeError, 757 int nativeQuicError, 758 String errorString, 759 long receivedByteCount) { 760 if (mResponseInfo != null) { 761 mResponseInfo.setReceivedByteCount(receivedByteCount); 762 } 763 if (errorCode == NetworkException.ERROR_QUIC_PROTOCOL_FAILED 764 || errorCode == NetworkException.ERROR_NETWORK_CHANGED) { 765 failWithException( 766 new QuicExceptionImpl( 767 "Exception in BidirectionalStream: " + errorString, 768 errorCode, 769 nativeError, 770 nativeQuicError)); 771 } else { 772 failWithException( 773 new BidirectionalStreamNetworkException( 774 "Exception in BidirectionalStream: " + errorString, 775 errorCode, 776 nativeError)); 777 } 778 } 779 780 /** Called when request is canceled, no callbacks will be called afterwards. */ 781 @SuppressWarnings("unused") 782 @CalledByNative onCanceled()783 private void onCanceled() { 784 postTaskToExecutor( 785 new Runnable() { 786 @Override 787 public void run() { 788 try { 789 mCallback.onCanceled(CronetBidirectionalStream.this, mResponseInfo); 790 } catch (Exception e) { 791 Log.e( 792 CronetUrlRequestContext.LOG_TAG, 793 "Exception in onCanceled method", 794 e); 795 } 796 mInflightDoneCallbackCount.decrement(); 797 } 798 }); 799 } 800 801 /** Called by the native code to report metrics just before the native adapter is destroyed. */ 802 @SuppressWarnings("unused") 803 @CalledByNative onMetricsCollected( long requestStartMs, long dnsStartMs, long dnsEndMs, long connectStartMs, long connectEndMs, long sslStartMs, long sslEndMs, long sendingStartMs, long sendingEndMs, long pushStartMs, long pushEndMs, long responseStartMs, long requestEndMs, boolean socketReused, long sentByteCount, long receivedByteCount)804 private void onMetricsCollected( 805 long requestStartMs, 806 long dnsStartMs, 807 long dnsEndMs, 808 long connectStartMs, 809 long connectEndMs, 810 long sslStartMs, 811 long sslEndMs, 812 long sendingStartMs, 813 long sendingEndMs, 814 long pushStartMs, 815 long pushEndMs, 816 long responseStartMs, 817 long requestEndMs, 818 boolean socketReused, 819 long sentByteCount, 820 long receivedByteCount) { 821 synchronized (mNativeStreamLock) { 822 try { 823 if (mMetrics != null) { 824 throw new IllegalStateException("Metrics collection should only happen once."); 825 } 826 mMetrics = 827 new CronetMetrics( 828 requestStartMs, 829 dnsStartMs, 830 dnsEndMs, 831 connectStartMs, 832 connectEndMs, 833 sslStartMs, 834 sslEndMs, 835 sendingStartMs, 836 sendingEndMs, 837 pushStartMs, 838 pushEndMs, 839 responseStartMs, 840 requestEndMs, 841 socketReused, 842 sentByteCount, 843 receivedByteCount); 844 assert mReadState == mWriteState; 845 assert (mReadState == State.SUCCESS) 846 || (mReadState == State.ERROR) 847 || (mReadState == State.CANCELED); 848 int finishedReason; 849 if (mReadState == State.SUCCESS) { 850 finishedReason = RequestFinishedInfo.SUCCEEDED; 851 } else if (mReadState == State.CANCELED) { 852 finishedReason = RequestFinishedInfo.CANCELED; 853 } else { 854 finishedReason = RequestFinishedInfo.FAILED; 855 } 856 final RequestFinishedInfo requestFinishedInfo = 857 new RequestFinishedInfoImpl( 858 mInitialUrl, 859 mRequestAnnotations, 860 mMetrics, 861 finishedReason, 862 mResponseInfo, 863 mException); 864 mRequestContext.reportRequestFinished( 865 requestFinishedInfo, mInflightDoneCallbackCount); 866 } finally { 867 mInflightDoneCallbackCount.decrement(); 868 } 869 } 870 } 871 setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting)872 public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) { 873 mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting; 874 } 875 doesMethodAllowWriteData(String methodName)876 private static boolean doesMethodAllowWriteData(String methodName) { 877 return !methodName.equals("GET") && !methodName.equals("HEAD"); 878 } 879 headersListFromStrings(String[] headers)880 private static ArrayList<Map.Entry<String, String>> headersListFromStrings(String[] headers) { 881 ArrayList<Map.Entry<String, String>> headersList = new ArrayList<>(headers.length / 2); 882 for (int i = 0; i < headers.length; i += 2) { 883 headersList.add(new AbstractMap.SimpleImmutableEntry<>(headers[i], headers[i + 1])); 884 } 885 return headersList; 886 } 887 stringsFromHeaderList(List<Map.Entry<String, String>> headersList)888 private static String[] stringsFromHeaderList(List<Map.Entry<String, String>> headersList) { 889 String headersArray[] = new String[headersList.size() * 2]; 890 int i = 0; 891 for (Map.Entry<String, String> requestHeader : headersList) { 892 headersArray[i++] = requestHeader.getKey(); 893 headersArray[i++] = requestHeader.getValue(); 894 } 895 return headersArray; 896 } 897 convertStreamPriority(@ronetEngineBase.StreamPriority int priority)898 private static int convertStreamPriority(@CronetEngineBase.StreamPriority int priority) { 899 switch (priority) { 900 case Builder.STREAM_PRIORITY_IDLE: 901 return RequestPriority.IDLE; 902 case Builder.STREAM_PRIORITY_LOWEST: 903 return RequestPriority.LOWEST; 904 case Builder.STREAM_PRIORITY_LOW: 905 return RequestPriority.LOW; 906 case Builder.STREAM_PRIORITY_MEDIUM: 907 return RequestPriority.MEDIUM; 908 case Builder.STREAM_PRIORITY_HIGHEST: 909 return RequestPriority.HIGHEST; 910 default: 911 throw new IllegalArgumentException("Invalid stream priority."); 912 } 913 } 914 915 /** 916 * Posts task to application Executor. Used for callbacks 917 * and other tasks that should not be executed on network thread. 918 */ postTaskToExecutor(Runnable task)919 private void postTaskToExecutor(Runnable task) { 920 try { 921 mExecutor.execute(task); 922 } catch (RejectedExecutionException failException) { 923 Log.e( 924 CronetUrlRequestContext.LOG_TAG, 925 "Exception posting task to executor", 926 failException); 927 // If posting a task throws an exception, then there is no choice 928 // but to destroy the stream without invoking the callback. 929 synchronized (mNativeStreamLock) { 930 mReadState = mWriteState = State.ERROR; 931 destroyNativeStreamLocked(false); 932 } 933 } 934 } 935 prepareResponseInfoOnNetworkThread( int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)936 private UrlResponseInfoImpl prepareResponseInfoOnNetworkThread( 937 int httpStatusCode, 938 String negotiatedProtocol, 939 String[] headers, 940 long receivedByteCount) { 941 UrlResponseInfoImpl responseInfo = 942 new UrlResponseInfoImpl( 943 Arrays.asList(mInitialUrl), 944 httpStatusCode, 945 "", 946 headersListFromStrings(headers), 947 false, 948 negotiatedProtocol, 949 null, 950 receivedByteCount); 951 return responseInfo; 952 } 953 954 @GuardedBy("mNativeStreamLock") destroyNativeStreamLocked(boolean sendOnCanceled)955 private void destroyNativeStreamLocked(boolean sendOnCanceled) { 956 Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + this.toString()); 957 if (mNativeStream == 0) { 958 return; 959 } 960 CronetBidirectionalStreamJni.get() 961 .destroy(mNativeStream, CronetBidirectionalStream.this, sendOnCanceled); 962 mRequestContext.onRequestDestroyed(); 963 mNativeStream = 0; 964 if (mOnDestroyedCallbackForTesting != null) { 965 mOnDestroyedCallbackForTesting.run(); 966 } 967 } 968 969 /** Fails the stream with an exception. Only called on the Executor. */ failWithExceptionOnExecutor(CronetException e)970 private void failWithExceptionOnExecutor(CronetException e) { 971 mException = e; 972 // Do not call into mCallback if request is complete. 973 synchronized (mNativeStreamLock) { 974 if (isDoneLocked()) { 975 return; 976 } 977 mReadState = mWriteState = State.ERROR; 978 destroyNativeStreamLocked(false); 979 } 980 try { 981 mCallback.onFailed(this, mResponseInfo, e); 982 } catch (Exception failException) { 983 Log.e( 984 CronetUrlRequestContext.LOG_TAG, 985 "Exception notifying of failed request", 986 failException); 987 } 988 mInflightDoneCallbackCount.decrement(); 989 } 990 991 /** 992 * If callback method throws an exception, stream gets canceled 993 * and exception is reported via onFailed callback. 994 * Only called on the Executor. 995 */ onCallbackException(Exception e)996 private void onCallbackException(Exception e) { 997 CallbackException streamError = 998 new CallbackExceptionImpl("CalledByNative method has thrown an exception", e); 999 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative method", e); 1000 failWithExceptionOnExecutor(streamError); 1001 } 1002 1003 /** Fails the stream with an exception. Can be called on any thread. */ failWithException(final CronetException exception)1004 private void failWithException(final CronetException exception) { 1005 postTaskToExecutor( 1006 new Runnable() { 1007 @Override 1008 public void run() { 1009 failWithExceptionOnExecutor(exception); 1010 } 1011 }); 1012 } 1013 1014 @NativeMethods 1015 interface Natives { 1016 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. createBidirectionalStream( CronetBidirectionalStream caller, long urlRequestContextAdapter, boolean sendRequestHeadersAutomatically, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)1017 long createBidirectionalStream( 1018 CronetBidirectionalStream caller, 1019 long urlRequestContextAdapter, 1020 boolean sendRequestHeadersAutomatically, 1021 boolean trafficStatsTagSet, 1022 int trafficStatsTag, 1023 boolean trafficStatsUidSet, 1024 int trafficStatsUid, 1025 long networkHandle); 1026 1027 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") start( long nativePtr, CronetBidirectionalStream caller, String url, int priority, String method, String[] headers, boolean endOfStream)1028 int start( 1029 long nativePtr, 1030 CronetBidirectionalStream caller, 1031 String url, 1032 int priority, 1033 String method, 1034 String[] headers, 1035 boolean endOfStream); 1036 1037 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller)1038 void sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller); 1039 1040 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") readData( long nativePtr, CronetBidirectionalStream caller, ByteBuffer byteBuffer, int position, int limit)1041 boolean readData( 1042 long nativePtr, 1043 CronetBidirectionalStream caller, 1044 ByteBuffer byteBuffer, 1045 int position, 1046 int limit); 1047 1048 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") writevData( long nativePtr, CronetBidirectionalStream caller, ByteBuffer[] buffers, int[] positions, int[] limits, boolean endOfStream)1049 boolean writevData( 1050 long nativePtr, 1051 CronetBidirectionalStream caller, 1052 ByteBuffer[] buffers, 1053 int[] positions, 1054 int[] limits, 1055 boolean endOfStream); 1056 1057 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled)1058 void destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled); 1059 } 1060 } 1061