1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.impl; 6 7 import androidx.annotation.IntDef; 8 import androidx.annotation.VisibleForTesting; 9 10 import org.chromium.base.Log; 11 import org.chromium.base.annotations.CalledByNative; 12 import org.chromium.base.annotations.JNINamespace; 13 import org.chromium.base.annotations.NativeClassQualifiedName; 14 import org.chromium.base.annotations.NativeMethods; 15 import android.net.http.BidirectionalStream; 16 import android.net.http.CallbackException; 17 import android.net.http.HeaderBlock; 18 import android.net.http.HttpException; 19 import android.net.http.ExperimentalBidirectionalStream; 20 import android.net.http.NetworkException; 21 import android.net.http.RequestFinishedInfo; 22 import org.chromium.net.RequestPriority; 23 import android.net.http.UrlResponseInfo; 24 25 import java.lang.annotation.Retention; 26 import java.lang.annotation.RetentionPolicy; 27 import java.nio.ByteBuffer; 28 import java.util.AbstractMap; 29 import java.util.ArrayList; 30 import java.util.Arrays; 31 import java.util.Collection; 32 import java.util.LinkedList; 33 import java.util.List; 34 import java.util.Map; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.RejectedExecutionException; 37 38 import javax.annotation.concurrent.GuardedBy; 39 40 /** 41 * {@link BidirectionalStream} implementation using Chromium network stack. 42 * All @CalledByNative methods are called on the native network thread 43 * and post tasks with callback calls onto Executor. Upon returning from callback, the native 44 * stream is called on Executor thread and posts native tasks to the native network thread. 45 */ 46 @JNINamespace("cronet") 47 @VisibleForTesting 48 public class CronetBidirectionalStream extends ExperimentalBidirectionalStream { 49 /** 50 * States of BidirectionalStream are tracked in mReadState and mWriteState. 51 * The write state is separated out as it changes independently of the read state. 52 * There is one initial state: State.NOT_STARTED. There is one normal final state: 53 * State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. There are two 54 * exceptional final states: State.CANCELED and State.ERROR, which can be reached from 55 * any other non-final state. 56 */ 57 @IntDef({State.NOT_STARTED, State.STARTED, State.WAITING_FOR_READ, State.READING, 58 State.READING_DONE, State.CANCELED, State.ERROR, State.SUCCESS, State.WAITING_FOR_FLUSH, 59 State.WRITING, State.WRITING_DONE}) 60 @Retention(RetentionPolicy.SOURCE) 61 private @interface State { 62 /* Initial state, stream not started. */ 63 int NOT_STARTED = 0; 64 /* 65 * Stream started, request headers are being sent if mDelayRequestHeadersUntilNextFlush 66 * is not set to true. 67 */ 68 int STARTED = 1; 69 /* Waiting for {@code read()} to be called. */ 70 int WAITING_FOR_READ = 2; 71 /* Reading from the remote, {@code onReadCompleted()} callback will be called when done. */ 72 int READING = 3; 73 /* There is no more data to read and stream is half-closed by the remote side. */ 74 int READING_DONE = 4; 75 /* Stream is canceled. */ 76 int CANCELED = 5; 77 /* Error has occurred, stream is closed. */ 78 int ERROR = 6; 79 /* Reading and writing are done, and the stream is closed successfully. */ 80 int SUCCESS = 7; 81 /* Waiting for {@code CronetBidirectionalStreamJni.get().sendRequestHeaders()} or {@code 82 CronetBidirectionalStreamJni.get().writevData()} to be called. */ 83 int WAITING_FOR_FLUSH = 8; 84 /* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */ 85 int WRITING = 9; 86 /* There is no more data to write and stream is half-closed by the local side. */ 87 int WRITING_DONE = 10; 88 } 89 90 private final CronetUrlRequestContext mRequestContext; 91 private final Executor mExecutor; 92 private final VersionSafeCallbacks.BidirectionalStreamCallback mCallback; 93 private final String mInitialUrl; 94 private final int mInitialPriority; 95 private final String mInitialMethod; 96 private final String mRequestHeaders[]; 97 private final HeaderBlock mRequestHeaderBlock; 98 private final boolean mDelayRequestHeadersUntilFirstFlush; 99 private final Collection<Object> mRequestAnnotations; 100 private final boolean mTrafficStatsTagSet; 101 private final int mTrafficStatsTag; 102 private final boolean mTrafficStatsUidSet; 103 private final int mTrafficStatsUid; 104 private final long mNetworkHandle; 105 private HttpException mException; 106 107 /* 108 * Synchronizes access to mNativeStream, mReadState and mWriteState. 109 */ 110 private final Object mNativeStreamLock = new Object(); 111 112 @GuardedBy("mNativeStreamLock") 113 // Pending write data. 114 private LinkedList<ByteBuffer> mPendingData; 115 116 @GuardedBy("mNativeStreamLock") 117 // Flush data queue that should be pushed to the native stack when the previous 118 // CronetBidirectionalStreamJni.get().writevData completes. 119 private LinkedList<ByteBuffer> mFlushData; 120 121 @GuardedBy("mNativeStreamLock") 122 // Whether an end-of-stream flag is passed in through write(). 123 private boolean mEndOfStreamWritten; 124 125 @GuardedBy("mNativeStreamLock") 126 // Whether request headers have been sent. 127 private boolean mRequestHeadersSent; 128 129 @GuardedBy("mNativeStreamLock") 130 // Metrics information. Obtained when request succeeds, fails or is canceled. 131 private RequestFinishedInfo.Metrics mMetrics; 132 133 /* Native BidirectionalStream object, owned by CronetBidirectionalStream. */ 134 @GuardedBy("mNativeStreamLock") 135 private long mNativeStream; 136 137 /** 138 * Read state is tracking reading flow. 139 * / <--- READING <--- \ 140 * | | 141 * \ / 142 * NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS 143 */ 144 @GuardedBy("mNativeStreamLock") 145 private @State int mReadState = State.NOT_STARTED; 146 147 /** 148 * Write state is tracking writing flow. 149 * / <--- WRITING <--- \ 150 * | | 151 * \ / 152 * NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS 153 */ 154 @GuardedBy("mNativeStreamLock") 155 private @State int mWriteState = State.NOT_STARTED; 156 157 // Only modified on the network thread. 158 private UrlResponseInfoImpl mResponseInfo; 159 160 /* 161 * OnReadCompleted callback is repeatedly invoked when each read is completed, so it 162 * is cached as a member variable. 163 */ 164 // Only modified on the network thread. 165 private OnReadCompletedRunnable mOnReadCompletedTask; 166 167 private Runnable mOnDestroyedCallbackForTesting; 168 169 private final class OnReadCompletedRunnable implements Runnable { 170 // Buffer passed back from current invocation of onReadCompleted. 171 ByteBuffer mByteBuffer; 172 // End of stream flag from current invocation of onReadCompleted. 173 boolean mEndOfStream; 174 175 @Override run()176 public void run() { 177 try { 178 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 179 ByteBuffer buffer = mByteBuffer; 180 mByteBuffer = null; 181 boolean maybeOnSucceeded = false; 182 synchronized (mNativeStreamLock) { 183 if (isDoneLocked()) { 184 return; 185 } 186 if (mEndOfStream) { 187 mReadState = State.READING_DONE; 188 maybeOnSucceeded = (mWriteState == State.WRITING_DONE); 189 } else { 190 mReadState = State.WAITING_FOR_READ; 191 } 192 } 193 mCallback.onReadCompleted( 194 CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); 195 if (maybeOnSucceeded) { 196 maybeOnSucceededOnExecutor(); 197 } 198 } catch (Exception e) { 199 onCallbackException(e); 200 } 201 } 202 } 203 204 private final class OnWriteCompletedRunnable implements Runnable { 205 // Buffer passed back from current invocation of onWriteCompleted. 206 private ByteBuffer mByteBuffer; 207 // End of stream flag from current call to write. 208 private final boolean mEndOfStream; 209 OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream)210 OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) { 211 mByteBuffer = buffer; 212 mEndOfStream = endOfStream; 213 } 214 215 @Override run()216 public void run() { 217 try { 218 // Null out mByteBuffer, to pass buffer ownership to callback or release if done. 219 ByteBuffer buffer = mByteBuffer; 220 mByteBuffer = null; 221 boolean maybeOnSucceeded = false; 222 synchronized (mNativeStreamLock) { 223 if (isDoneLocked()) { 224 return; 225 } 226 if (mEndOfStream) { 227 mWriteState = State.WRITING_DONE; 228 maybeOnSucceeded = (mReadState == State.READING_DONE); 229 } 230 } 231 mCallback.onWriteCompleted( 232 CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream); 233 if (maybeOnSucceeded) { 234 maybeOnSucceededOnExecutor(); 235 } 236 } catch (Exception e) { 237 onCallbackException(e); 238 } 239 } 240 } 241 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, @CronetEngineBase.StreamPriority int priority, Callback callback, Executor executor, String httpMethod, List<Map.Entry<String, String>> requestHeaders, boolean delayRequestHeadersUntilNextFlush, Collection<Object> requestAnnotations, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)242 CronetBidirectionalStream(CronetUrlRequestContext requestContext, String url, 243 @CronetEngineBase.StreamPriority int priority, Callback callback, Executor executor, 244 String httpMethod, List<Map.Entry<String, String>> requestHeaders, 245 boolean delayRequestHeadersUntilNextFlush, Collection<Object> requestAnnotations, 246 boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, 247 int trafficStatsUid, long networkHandle) { 248 mRequestContext = requestContext; 249 mInitialUrl = url; 250 mInitialPriority = convertStreamPriority(priority); 251 mCallback = new VersionSafeCallbacks.BidirectionalStreamCallback(callback); 252 mExecutor = executor; 253 mInitialMethod = httpMethod; 254 mRequestHeaders = stringsFromHeaderList(requestHeaders); 255 mRequestHeaderBlock = new HeaderBlockImpl(requestHeaders); 256 mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush; 257 mPendingData = new LinkedList<>(); 258 mFlushData = new LinkedList<>(); 259 mRequestAnnotations = requestAnnotations; 260 mTrafficStatsTagSet = trafficStatsTagSet; 261 mTrafficStatsTag = trafficStatsTag; 262 mTrafficStatsUidSet = trafficStatsUidSet; 263 mTrafficStatsUid = trafficStatsUid; 264 mNetworkHandle = networkHandle; 265 } 266 267 @Override getHttpMethod()268 public String getHttpMethod() { 269 return mInitialMethod; 270 } 271 272 @Override hasTrafficStatsTag()273 public boolean hasTrafficStatsTag() { 274 return mTrafficStatsTagSet; 275 } 276 277 @Override getTrafficStatsTag()278 public int getTrafficStatsTag() { 279 if (!hasTrafficStatsTag()) { 280 throw new IllegalStateException("TrafficStatsTag is not set"); 281 } 282 return mTrafficStatsTag; 283 } 284 285 @Override hasTrafficStatsUid()286 public boolean hasTrafficStatsUid() { 287 return mTrafficStatsUidSet; 288 } 289 290 @Override getTrafficStatsUid()291 public int getTrafficStatsUid() { 292 if (!hasTrafficStatsUid()) { 293 throw new IllegalStateException("TrafficStatsUid is not set"); 294 } 295 return mTrafficStatsUid; 296 } 297 298 @Override getHeaders()299 public HeaderBlock getHeaders() { 300 return mRequestHeaderBlock; 301 } 302 303 @Override getPriority()304 public int getPriority() { 305 switch (mInitialPriority) { 306 case RequestPriority.IDLE: 307 return STREAM_PRIORITY_IDLE; 308 case RequestPriority.LOWEST: 309 return STREAM_PRIORITY_LOWEST; 310 case RequestPriority.LOW: 311 return STREAM_PRIORITY_LOW; 312 case RequestPriority.MEDIUM: 313 return STREAM_PRIORITY_MEDIUM; 314 case RequestPriority.HIGHEST: 315 return STREAM_PRIORITY_HIGHEST; 316 default: 317 throw new IllegalStateException("Invalid stream priority: " + mInitialPriority); 318 } 319 } 320 321 @Override isDelayRequestHeadersUntilFirstFlushEnabled()322 public boolean isDelayRequestHeadersUntilFirstFlushEnabled() { 323 return mDelayRequestHeadersUntilFirstFlush; 324 } 325 326 @Override start()327 public void start() { 328 synchronized (mNativeStreamLock) { 329 if (mReadState != State.NOT_STARTED) { 330 throw new IllegalStateException("Stream is already started."); 331 } 332 try { 333 mNativeStream = CronetBidirectionalStreamJni.get().createBidirectionalStream( 334 CronetBidirectionalStream.this, 335 mRequestContext.getUrlRequestContextAdapter(), 336 !mDelayRequestHeadersUntilFirstFlush, mTrafficStatsTagSet, mTrafficStatsTag, 337 mTrafficStatsUidSet, mTrafficStatsUid, mNetworkHandle); 338 mRequestContext.onRequestStarted(); 339 // Non-zero startResult means an argument error. 340 int startResult = CronetBidirectionalStreamJni.get().start(mNativeStream, 341 CronetBidirectionalStream.this, mInitialUrl, mInitialPriority, 342 mInitialMethod, mRequestHeaders, !doesMethodAllowWriteData(mInitialMethod)); 343 if (startResult == -1) { 344 throw new IllegalArgumentException("Invalid http method " + mInitialMethod); 345 } 346 if (startResult > 0) { 347 int headerPos = startResult - 1; 348 throw new IllegalArgumentException("Invalid header " 349 + mRequestHeaders[headerPos] + "=" + mRequestHeaders[headerPos + 1]); 350 } 351 mReadState = mWriteState = State.STARTED; 352 } catch (RuntimeException e) { 353 // If there's an exception, clean up and then throw the 354 // exception to the caller. 355 destroyNativeStreamLocked(false); 356 throw e; 357 } 358 } 359 } 360 361 @Override read(ByteBuffer buffer)362 public void read(ByteBuffer buffer) { 363 synchronized (mNativeStreamLock) { 364 Preconditions.checkHasRemaining(buffer); 365 Preconditions.checkDirect(buffer); 366 if (mReadState != State.WAITING_FOR_READ) { 367 throw new IllegalStateException("Unexpected read attempt."); 368 } 369 if (isDoneLocked()) { 370 return; 371 } 372 if (mOnReadCompletedTask == null) { 373 mOnReadCompletedTask = new OnReadCompletedRunnable(); 374 } 375 mReadState = State.READING; 376 if (!CronetBidirectionalStreamJni.get().readData(mNativeStream, 377 CronetBidirectionalStream.this, buffer, buffer.position(), 378 buffer.limit())) { 379 // Still waiting on read. This is just to have consistent 380 // behavior with the other error cases. 381 mReadState = State.WAITING_FOR_READ; 382 throw new IllegalArgumentException("Unable to call native read"); 383 } 384 } 385 } 386 387 @Override write(ByteBuffer buffer, boolean endOfStream)388 public void write(ByteBuffer buffer, boolean endOfStream) { 389 synchronized (mNativeStreamLock) { 390 Preconditions.checkDirect(buffer); 391 if (!buffer.hasRemaining() && !endOfStream) { 392 throw new IllegalArgumentException("Empty buffer before end of stream."); 393 } 394 if (mEndOfStreamWritten) { 395 throw new IllegalArgumentException("Write after writing end of stream."); 396 } 397 if (isDoneLocked()) { 398 return; 399 } 400 mPendingData.add(buffer); 401 if (endOfStream) { 402 mEndOfStreamWritten = true; 403 } 404 } 405 } 406 407 @Override flush()408 public void flush() { 409 synchronized (mNativeStreamLock) { 410 if (isDoneLocked() 411 || (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != State.WRITING)) { 412 return; 413 } 414 if (mPendingData.isEmpty() && mFlushData.isEmpty()) { 415 // If there is no pending write when flush() is called, see if 416 // request headers need to be flushed. 417 if (!mRequestHeadersSent) { 418 mRequestHeadersSent = true; 419 CronetBidirectionalStreamJni.get().sendRequestHeaders( 420 mNativeStream, CronetBidirectionalStream.this); 421 if (!doesMethodAllowWriteData(mInitialMethod)) { 422 mWriteState = State.WRITING_DONE; 423 } 424 } 425 return; 426 } 427 428 assert !mPendingData.isEmpty() || !mFlushData.isEmpty(); 429 430 // Move buffers from mPendingData to the flushing queue. 431 if (!mPendingData.isEmpty()) { 432 mFlushData.addAll(mPendingData); 433 mPendingData.clear(); 434 } 435 436 if (mWriteState == State.WRITING) { 437 // If there is a write already pending, wait until onWritevCompleted is 438 // called before pushing data to the native stack. 439 return; 440 } 441 sendFlushDataLocked(); 442 } 443 } 444 445 // Helper method to send buffers in mFlushData. Caller needs to acquire 446 // mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and 447 // mFlushData queue isn't empty. 448 @SuppressWarnings("GuardedByChecker") sendFlushDataLocked()449 private void sendFlushDataLocked() { 450 assert mWriteState == State.WAITING_FOR_FLUSH; 451 int size = mFlushData.size(); 452 ByteBuffer[] buffers = new ByteBuffer[size]; 453 int[] positions = new int[size]; 454 int[] limits = new int[size]; 455 for (int i = 0; i < size; i++) { 456 ByteBuffer buffer = mFlushData.poll(); 457 buffers[i] = buffer; 458 positions[i] = buffer.position(); 459 limits[i] = buffer.limit(); 460 } 461 assert mFlushData.isEmpty(); 462 assert buffers.length >= 1; 463 mWriteState = State.WRITING; 464 mRequestHeadersSent = true; 465 if (!CronetBidirectionalStreamJni.get().writevData(mNativeStream, 466 CronetBidirectionalStream.this, buffers, positions, limits, 467 mEndOfStreamWritten && mPendingData.isEmpty())) { 468 // Still waiting on flush. This is just to have consistent 469 // behavior with the other error cases. 470 mWriteState = State.WAITING_FOR_FLUSH; 471 throw new IllegalArgumentException("Unable to call native writev."); 472 } 473 } 474 475 /** 476 * Returns a read-only copy of {@code mPendingData} for testing. 477 */ 478 @VisibleForTesting getPendingDataForTesting()479 public List<ByteBuffer> getPendingDataForTesting() { 480 synchronized (mNativeStreamLock) { 481 List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>(); 482 for (ByteBuffer buffer : mPendingData) { 483 pendingData.add(buffer.asReadOnlyBuffer()); 484 } 485 return pendingData; 486 } 487 } 488 489 /** 490 * Returns a read-only copy of {@code mFlushData} for testing. 491 */ 492 @VisibleForTesting getFlushDataForTesting()493 public List<ByteBuffer> getFlushDataForTesting() { 494 synchronized (mNativeStreamLock) { 495 List<ByteBuffer> flushData = new LinkedList<ByteBuffer>(); 496 for (ByteBuffer buffer : mFlushData) { 497 flushData.add(buffer.asReadOnlyBuffer()); 498 } 499 return flushData; 500 } 501 } 502 503 @Override cancel()504 public void cancel() { 505 synchronized (mNativeStreamLock) { 506 if (isDoneLocked() || mReadState == State.NOT_STARTED) { 507 return; 508 } 509 mReadState = mWriteState = State.CANCELED; 510 destroyNativeStreamLocked(true); 511 } 512 } 513 514 @Override isDone()515 public boolean isDone() { 516 synchronized (mNativeStreamLock) { 517 return isDoneLocked(); 518 } 519 } 520 521 @GuardedBy("mNativeStreamLock") isDoneLocked()522 private boolean isDoneLocked() { 523 return mReadState != State.NOT_STARTED && mNativeStream == 0; 524 } 525 526 /* 527 * Runs an onSucceeded callback if both Read and Write sides are closed. 528 */ maybeOnSucceededOnExecutor()529 private void maybeOnSucceededOnExecutor() { 530 synchronized (mNativeStreamLock) { 531 if (isDoneLocked()) { 532 return; 533 } 534 if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) { 535 return; 536 } 537 mReadState = mWriteState = State.SUCCESS; 538 // Destroy native stream first, so UrlRequestContext could be shut 539 // down from the listener. 540 destroyNativeStreamLocked(false); 541 } 542 try { 543 mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo); 544 } catch (Exception e) { 545 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e); 546 } 547 } 548 549 @SuppressWarnings("unused") 550 @CalledByNative onStreamReady(final boolean requestHeadersSent)551 private void onStreamReady(final boolean requestHeadersSent) { 552 postTaskToExecutor(new Runnable() { 553 @Override 554 public void run() { 555 synchronized (mNativeStreamLock) { 556 if (isDoneLocked()) { 557 return; 558 } 559 mRequestHeadersSent = requestHeadersSent; 560 mReadState = State.WAITING_FOR_READ; 561 if (!doesMethodAllowWriteData(mInitialMethod) && mRequestHeadersSent) { 562 mWriteState = State.WRITING_DONE; 563 } else { 564 mWriteState = State.WAITING_FOR_FLUSH; 565 } 566 } 567 568 try { 569 mCallback.onStreamReady(CronetBidirectionalStream.this); 570 } catch (Exception e) { 571 onCallbackException(e); 572 } 573 } 574 }); 575 } 576 577 /** 578 * Called when the final set of headers, after all redirects, 579 * is received. Can only be called once for each stream. 580 */ 581 @SuppressWarnings("unused") 582 @CalledByNative onResponseHeadersReceived(int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)583 private void onResponseHeadersReceived(int httpStatusCode, String negotiatedProtocol, 584 String[] headers, long receivedByteCount) { 585 try { 586 mResponseInfo = prepareResponseInfoOnNetworkThread( 587 httpStatusCode, negotiatedProtocol, headers, receivedByteCount); 588 } catch (Exception e) { 589 failWithException(new CronetExceptionImpl("Cannot prepare ResponseInfo", null)); 590 return; 591 } 592 postTaskToExecutor(new Runnable() { 593 @Override 594 public void run() { 595 synchronized (mNativeStreamLock) { 596 if (isDoneLocked()) { 597 return; 598 } 599 mReadState = State.WAITING_FOR_READ; 600 } 601 602 try { 603 mCallback.onResponseHeadersReceived( 604 CronetBidirectionalStream.this, mResponseInfo); 605 } catch (Exception e) { 606 onCallbackException(e); 607 } 608 } 609 }); 610 } 611 612 @SuppressWarnings("unused") 613 @CalledByNative onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition, int initialLimit, long receivedByteCount)614 private void onReadCompleted(final ByteBuffer byteBuffer, int bytesRead, int initialPosition, 615 int initialLimit, long receivedByteCount) { 616 mResponseInfo.setReceivedByteCount(receivedByteCount); 617 if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) { 618 failWithException( 619 new CronetExceptionImpl("ByteBuffer modified externally during read", null)); 620 return; 621 } 622 if (bytesRead < 0 || initialPosition + bytesRead > initialLimit) { 623 failWithException(new CronetExceptionImpl("Invalid number of bytes read", null)); 624 return; 625 } 626 byteBuffer.position(initialPosition + bytesRead); 627 assert mOnReadCompletedTask.mByteBuffer == null; 628 mOnReadCompletedTask.mByteBuffer = byteBuffer; 629 mOnReadCompletedTask.mEndOfStream = (bytesRead == 0); 630 postTaskToExecutor(mOnReadCompletedTask); 631 } 632 633 @SuppressWarnings("unused") 634 @CalledByNative onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, int[] initialLimits, boolean endOfStream)635 private void onWritevCompleted(final ByteBuffer[] byteBuffers, int[] initialPositions, 636 int[] initialLimits, boolean endOfStream) { 637 assert byteBuffers.length == initialPositions.length; 638 assert byteBuffers.length == initialLimits.length; 639 synchronized (mNativeStreamLock) { 640 if (isDoneLocked()) return; 641 mWriteState = State.WAITING_FOR_FLUSH; 642 // Flush if there is anything in the flush queue mFlushData. 643 if (!mFlushData.isEmpty()) { 644 sendFlushDataLocked(); 645 } 646 } 647 for (int i = 0; i < byteBuffers.length; i++) { 648 ByteBuffer buffer = byteBuffers[i]; 649 if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) { 650 failWithException(new CronetExceptionImpl( 651 "ByteBuffer modified externally during write", null)); 652 return; 653 } 654 // Current implementation always writes the complete buffer. 655 buffer.position(buffer.limit()); 656 postTaskToExecutor(new OnWriteCompletedRunnable(buffer, 657 // Only set endOfStream flag if this buffer is the last in byteBuffers. 658 endOfStream && i == byteBuffers.length - 1)); 659 } 660 } 661 662 @SuppressWarnings("unused") 663 @CalledByNative onResponseTrailersReceived(String[] trailers)664 private void onResponseTrailersReceived(String[] trailers) { 665 final HeaderBlock trailersBlock = new HeaderBlockImpl(headersListFromStrings(trailers)); 666 postTaskToExecutor(new Runnable() { 667 @Override 668 public void run() { 669 synchronized (mNativeStreamLock) { 670 if (isDoneLocked()) { 671 return; 672 } 673 } 674 try { 675 mCallback.onResponseTrailersReceived( 676 CronetBidirectionalStream.this, mResponseInfo, trailersBlock); 677 } catch (Exception e) { 678 onCallbackException(e); 679 } 680 } 681 }); 682 } 683 684 @SuppressWarnings("unused") 685 @CalledByNative onError(int errorCode, int nativeError, int nativeQuicError, String errorString, long receivedByteCount)686 private void onError(int errorCode, int nativeError, int nativeQuicError, String errorString, 687 long receivedByteCount) { 688 if (mResponseInfo != null) { 689 mResponseInfo.setReceivedByteCount(receivedByteCount); 690 } 691 if (errorCode == NetworkException.ERROR_QUIC_PROTOCOL_FAILED 692 || errorCode == NetworkException.ERROR_NETWORK_CHANGED) { 693 failWithException( 694 new QuicExceptionImpl("Exception in BidirectionalStream: " + errorString, 695 errorCode, nativeError, nativeQuicError)); 696 } else { 697 failWithException(new BidirectionalStreamNetworkException( 698 "Exception in BidirectionalStream: " + errorString, errorCode, nativeError)); 699 } 700 } 701 702 /** 703 * Called when request is canceled, no callbacks will be called afterwards. 704 */ 705 @SuppressWarnings("unused") 706 @CalledByNative onCanceled()707 private void onCanceled() { 708 postTaskToExecutor(new Runnable() { 709 @Override 710 public void run() { 711 try { 712 mCallback.onCanceled(CronetBidirectionalStream.this, mResponseInfo); 713 } catch (Exception e) { 714 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCanceled method", e); 715 } 716 } 717 }); 718 } 719 720 /** 721 * Called by the native code to report metrics just before the native adapter is destroyed. 722 */ 723 @SuppressWarnings("unused") 724 @CalledByNative onMetricsCollected(long requestStartMs, long dnsStartMs, long dnsEndMs, long connectStartMs, long connectEndMs, long sslStartMs, long sslEndMs, long sendingStartMs, long sendingEndMs, long pushStartMs, long pushEndMs, long responseStartMs, long requestEndMs, boolean socketReused, long sentByteCount, long receivedByteCount)725 private void onMetricsCollected(long requestStartMs, long dnsStartMs, long dnsEndMs, 726 long connectStartMs, long connectEndMs, long sslStartMs, long sslEndMs, 727 long sendingStartMs, long sendingEndMs, long pushStartMs, long pushEndMs, 728 long responseStartMs, long requestEndMs, boolean socketReused, long sentByteCount, 729 long receivedByteCount) { 730 synchronized (mNativeStreamLock) { 731 if (mMetrics != null) { 732 throw new IllegalStateException("Metrics collection should only happen once."); 733 } 734 mMetrics = new CronetMetrics(requestStartMs, dnsStartMs, dnsEndMs, connectStartMs, 735 connectEndMs, sslStartMs, sslEndMs, sendingStartMs, sendingEndMs, pushStartMs, 736 pushEndMs, responseStartMs, requestEndMs, socketReused, sentByteCount, 737 receivedByteCount); 738 assert mReadState == mWriteState; 739 assert (mReadState == State.SUCCESS) || (mReadState == State.ERROR) 740 || (mReadState == State.CANCELED); 741 int finishedReason; 742 if (mReadState == State.SUCCESS) { 743 finishedReason = RequestFinishedInfo.SUCCEEDED; 744 } else if (mReadState == State.CANCELED) { 745 finishedReason = RequestFinishedInfo.CANCELED; 746 } else { 747 finishedReason = RequestFinishedInfo.FAILED; 748 } 749 final RequestFinishedInfo requestFinishedInfo = new RequestFinishedInfoImpl(mInitialUrl, 750 mRequestAnnotations, mMetrics, finishedReason, mResponseInfo, mException); 751 mRequestContext.reportRequestFinished(requestFinishedInfo); 752 } 753 } 754 755 @VisibleForTesting setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting)756 public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) { 757 mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting; 758 } 759 doesMethodAllowWriteData(String methodName)760 private static boolean doesMethodAllowWriteData(String methodName) { 761 return !methodName.equals("GET") && !methodName.equals("HEAD"); 762 } 763 headersListFromStrings(String[] headers)764 private static ArrayList<Map.Entry<String, String>> headersListFromStrings(String[] headers) { 765 ArrayList<Map.Entry<String, String>> headersList = new ArrayList<>(headers.length / 2); 766 for (int i = 0; i < headers.length; i += 2) { 767 headersList.add(new AbstractMap.SimpleImmutableEntry<>(headers[i], headers[i + 1])); 768 } 769 return headersList; 770 } 771 stringsFromHeaderList(List<Map.Entry<String, String>> headersList)772 private static String[] stringsFromHeaderList(List<Map.Entry<String, String>> headersList) { 773 String headersArray[] = new String[headersList.size() * 2]; 774 int i = 0; 775 for (Map.Entry<String, String> requestHeader : headersList) { 776 headersArray[i++] = requestHeader.getKey(); 777 headersArray[i++] = requestHeader.getValue(); 778 } 779 return headersArray; 780 } 781 convertStreamPriority(@ronetEngineBase.StreamPriority int priority)782 private static int convertStreamPriority(@CronetEngineBase.StreamPriority int priority) { 783 switch (priority) { 784 case STREAM_PRIORITY_IDLE: 785 return RequestPriority.IDLE; 786 case STREAM_PRIORITY_LOWEST: 787 return RequestPriority.LOWEST; 788 case STREAM_PRIORITY_LOW: 789 return RequestPriority.LOW; 790 case STREAM_PRIORITY_MEDIUM: 791 return RequestPriority.MEDIUM; 792 case STREAM_PRIORITY_HIGHEST: 793 return RequestPriority.HIGHEST; 794 default: 795 throw new IllegalArgumentException("Invalid stream priority."); 796 } 797 } 798 799 /** 800 * Posts task to application Executor. Used for callbacks 801 * and other tasks that should not be executed on network thread. 802 */ postTaskToExecutor(Runnable task)803 private void postTaskToExecutor(Runnable task) { 804 try { 805 mExecutor.execute(task); 806 } catch (RejectedExecutionException failException) { 807 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to executor", 808 failException); 809 // If posting a task throws an exception, then there is no choice 810 // but to destroy the stream without invoking the callback. 811 synchronized (mNativeStreamLock) { 812 mReadState = mWriteState = State.ERROR; 813 destroyNativeStreamLocked(false); 814 } 815 } 816 } 817 prepareResponseInfoOnNetworkThread(int httpStatusCode, String negotiatedProtocol, String[] headers, long receivedByteCount)818 private UrlResponseInfoImpl prepareResponseInfoOnNetworkThread(int httpStatusCode, 819 String negotiatedProtocol, String[] headers, long receivedByteCount) { 820 UrlResponseInfoImpl responseInfo = new UrlResponseInfoImpl(Arrays.asList(mInitialUrl), 821 httpStatusCode, "", headersListFromStrings(headers), false, negotiatedProtocol, 822 null, receivedByteCount); 823 return responseInfo; 824 } 825 826 @GuardedBy("mNativeStreamLock") destroyNativeStreamLocked(boolean sendOnCanceled)827 private void destroyNativeStreamLocked(boolean sendOnCanceled) { 828 Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + this.toString()); 829 if (mNativeStream == 0) { 830 return; 831 } 832 CronetBidirectionalStreamJni.get().destroy( 833 mNativeStream, CronetBidirectionalStream.this, sendOnCanceled); 834 mRequestContext.onRequestDestroyed(); 835 mNativeStream = 0; 836 if (mOnDestroyedCallbackForTesting != null) { 837 mOnDestroyedCallbackForTesting.run(); 838 } 839 } 840 841 /** 842 * Fails the stream with an exception. Only called on the Executor. 843 */ failWithExceptionOnExecutor(HttpException e)844 private void failWithExceptionOnExecutor(HttpException e) { 845 mException = e; 846 // Do not call into mCallback if request is complete. 847 synchronized (mNativeStreamLock) { 848 if (isDoneLocked()) { 849 return; 850 } 851 mReadState = mWriteState = State.ERROR; 852 destroyNativeStreamLocked(false); 853 } 854 try { 855 mCallback.onFailed(this, mResponseInfo, e); 856 } catch (Exception failException) { 857 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception notifying of failed request", 858 failException); 859 } 860 } 861 862 /** 863 * If callback method throws an exception, stream gets canceled 864 * and exception is reported via onFailed callback. 865 * Only called on the Executor. 866 */ onCallbackException(Exception e)867 private void onCallbackException(Exception e) { 868 CallbackException streamError = 869 new CallbackExceptionImpl("CalledByNative method has thrown an exception", e); 870 Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative method", e); 871 failWithExceptionOnExecutor(streamError); 872 } 873 874 /** 875 * Fails the stream with an exception. Can be called on any thread. 876 */ failWithException(final HttpException exception)877 private void failWithException(final HttpException exception) { 878 postTaskToExecutor(new Runnable() { 879 @Override 880 public void run() { 881 failWithExceptionOnExecutor(exception); 882 } 883 }); 884 } 885 886 @NativeMethods 887 interface Natives { 888 // Native methods are implemented in cronet_bidirectional_stream_adapter.cc. createBidirectionalStream(CronetBidirectionalStream caller, long urlRequestContextAdapter, boolean sendRequestHeadersAutomatically, boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, int trafficStatsUid, long networkHandle)889 long createBidirectionalStream(CronetBidirectionalStream caller, 890 long urlRequestContextAdapter, boolean sendRequestHeadersAutomatically, 891 boolean trafficStatsTagSet, int trafficStatsTag, boolean trafficStatsUidSet, 892 int trafficStatsUid, long networkHandle); 893 894 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") start(long nativePtr, CronetBidirectionalStream caller, String url, int priority, String method, String[] headers, boolean endOfStream)895 int start(long nativePtr, CronetBidirectionalStream caller, String url, int priority, 896 String method, String[] headers, boolean endOfStream); 897 898 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller)899 void sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller); 900 901 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") readData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer byteBuffer, int position, int limit)902 boolean readData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer byteBuffer, 903 int position, int limit); 904 905 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") writevData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer[] buffers, int[] positions, int[] limits, boolean endOfStream)906 boolean writevData(long nativePtr, CronetBidirectionalStream caller, ByteBuffer[] buffers, 907 int[] positions, int[] limits, boolean endOfStream); 908 909 @NativeClassQualifiedName("CronetBidirectionalStreamAdapter") destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled)910 void destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled); 911 } 912 } 913