1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net; 6 7 import static com.google.common.truth.Truth.assertThat; 8 9 import android.os.ConditionVariable; 10 11 import java.nio.ByteBuffer; 12 import java.util.ArrayList; 13 import java.util.Iterator; 14 import java.util.concurrent.Executor; 15 import java.util.concurrent.ExecutorService; 16 import java.util.concurrent.Executors; 17 import java.util.concurrent.ThreadFactory; 18 19 /** 20 * Callback that tracks information from different callbacks and and has a 21 * method to block thread until the stream completes on another thread. 22 * Allows to cancel, block stream or throw an exception from an arbitrary step. 23 */ 24 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callback { 25 private UrlResponseInfo mResponseInfo; 26 public CronetException mError; 27 28 public ResponseStep mResponseStep = ResponseStep.NOTHING; 29 30 public boolean mOnErrorCalled; 31 public boolean mOnCanceledCalled; 32 33 public int mHttpResponseDataLength; 34 public String mResponseAsString = ""; 35 36 public UrlResponseInfo.HeaderBlock mTrailers; 37 38 private static final int READ_BUFFER_SIZE = 32 * 1024; 39 40 // When false, the consumer is responsible for all calls into the stream 41 // that advance it. 42 private boolean mAutoAdvance = true; 43 44 // The executor thread will block on this after reaching a terminal method. 45 // Terminal methods are (onSucceeded, onFailed or onCancelled) 46 private ConditionVariable mBlockOnTerminalState = new ConditionVariable(true); 47 48 // Conditionally fail on certain steps. 49 private FailureType mFailureType = FailureType.NONE; 50 private ResponseStep mFailureStep = ResponseStep.NOTHING; 51 52 // Signals when the stream is done either successfully or not. 53 private final ConditionVariable mDone = new ConditionVariable(); 54 55 // Signaled on each step when mAutoAdvance is false. 56 private final ConditionVariable mReadStepBlock = new ConditionVariable(); 57 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); 58 59 // Executor Service for Cronet callbacks. 60 private final ExecutorService mExecutorService = 61 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); 62 private Thread mExecutorThread; 63 64 // position() of ByteBuffer prior to read() call. 65 private int mBufferPositionBeforeRead; 66 67 // Data to write. 68 private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>(); 69 70 // Buffers that we yet to receive the corresponding onWriteCompleted callback. 71 private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>(); 72 73 // Whether to use a direct executor. 74 private final boolean mUseDirectExecutor; 75 private final DirectExecutor mDirectExecutor; 76 77 private class ExecutorThreadFactory implements ThreadFactory { 78 @Override newThread(Runnable r)79 public Thread newThread(Runnable r) { 80 mExecutorThread = new Thread(r); 81 return mExecutorThread; 82 } 83 } 84 85 private static class WriteBuffer { 86 final ByteBuffer mBuffer; 87 final boolean mFlush; 88 WriteBuffer(ByteBuffer buffer, boolean flush)89 public WriteBuffer(ByteBuffer buffer, boolean flush) { 90 mBuffer = buffer; 91 mFlush = flush; 92 } 93 } 94 95 private static class DirectExecutor implements Executor { 96 @Override execute(Runnable task)97 public void execute(Runnable task) { 98 task.run(); 99 } 100 } 101 102 public enum ResponseStep { 103 NOTHING, 104 ON_STREAM_READY, 105 ON_RESPONSE_STARTED, 106 ON_READ_COMPLETED, 107 ON_WRITE_COMPLETED, 108 ON_TRAILERS, 109 ON_CANCELED, 110 ON_FAILED, 111 ON_SUCCEEDED, 112 } 113 114 public enum FailureType { 115 NONE, 116 CANCEL_SYNC, 117 CANCEL_ASYNC, 118 // Same as above, but continues to advance the stream after posting 119 // the cancellation task. 120 CANCEL_ASYNC_WITHOUT_PAUSE, 121 THROW_SYNC 122 } 123 TestBidirectionalStreamCallback()124 public TestBidirectionalStreamCallback() { 125 mUseDirectExecutor = false; 126 mDirectExecutor = null; 127 } 128 TestBidirectionalStreamCallback(boolean useDirectExecutor)129 public TestBidirectionalStreamCallback(boolean useDirectExecutor) { 130 mUseDirectExecutor = useDirectExecutor; 131 mDirectExecutor = new DirectExecutor(); 132 } 133 134 /** 135 * This blocks the callback executor thread once it has reached a final state callback. 136 * In order to continue execution, this method must be called again and providing {@code false} 137 * to continue execution. 138 * @param blockOnTerminalState the state to set for the executor thread 139 */ setBlockOnTerminalState(boolean blockOnTerminalState)140 public void setBlockOnTerminalState(boolean blockOnTerminalState) { 141 if (blockOnTerminalState) { 142 mBlockOnTerminalState.close(); 143 } else { 144 mBlockOnTerminalState.open(); 145 } 146 } 147 setAutoAdvance(boolean autoAdvance)148 public void setAutoAdvance(boolean autoAdvance) { 149 mAutoAdvance = autoAdvance; 150 } 151 setFailure(FailureType failureType, ResponseStep failureStep)152 public void setFailure(FailureType failureType, ResponseStep failureStep) { 153 mFailureStep = failureStep; 154 mFailureType = failureType; 155 } 156 blockForDone()157 public void blockForDone() { 158 mDone.block(); 159 } 160 waitForNextReadStep()161 public void waitForNextReadStep() { 162 mReadStepBlock.block(); 163 mReadStepBlock.close(); 164 } 165 waitForNextWriteStep()166 public void waitForNextWriteStep() { 167 mWriteStepBlock.block(); 168 mWriteStepBlock.close(); 169 } 170 getExecutor()171 public Executor getExecutor() { 172 if (mUseDirectExecutor) { 173 return mDirectExecutor; 174 } 175 return mExecutorService; 176 } 177 shutdownExecutor()178 public void shutdownExecutor() { 179 if (mUseDirectExecutor) { 180 throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown"); 181 } 182 mExecutorService.shutdown(); 183 } 184 addWriteData(byte[] data)185 public void addWriteData(byte[] data) { 186 addWriteData(data, true); 187 } 188 addWriteData(byte[] data, boolean flush)189 public void addWriteData(byte[] data, boolean flush) { 190 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); 191 writeBuffer.put(data); 192 writeBuffer.flip(); 193 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush)); 194 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush)); 195 } 196 197 @Override onStreamReady(BidirectionalStream stream)198 public void onStreamReady(BidirectionalStream stream) { 199 checkOnValidThread(); 200 assertThat(stream.isDone()).isFalse(); 201 assertThat(mResponseStep).isEqualTo(ResponseStep.NOTHING); 202 assertThat(mError).isNull(); 203 mResponseStep = ResponseStep.ON_STREAM_READY; 204 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 205 return; 206 } 207 startNextWrite(stream); 208 } 209 210 @Override onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)211 public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { 212 checkOnValidThread(); 213 assertThat(stream.isDone()).isFalse(); 214 assertThat(mResponseStep) 215 .isAnyOf( 216 ResponseStep.NOTHING, 217 ResponseStep.ON_STREAM_READY, 218 ResponseStep.ON_WRITE_COMPLETED); 219 assertThat(mError).isNull(); 220 221 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; 222 mResponseInfo = info; 223 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 224 return; 225 } 226 startNextRead(stream); 227 } 228 229 @Override onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)230 public void onReadCompleted( 231 BidirectionalStream stream, 232 UrlResponseInfo info, 233 ByteBuffer byteBuffer, 234 boolean endOfStream) { 235 checkOnValidThread(); 236 assertThat(stream.isDone()).isFalse(); 237 assertThat(mResponseStep) 238 .isAnyOf( 239 ResponseStep.ON_RESPONSE_STARTED, 240 ResponseStep.ON_READ_COMPLETED, 241 ResponseStep.ON_WRITE_COMPLETED, 242 ResponseStep.ON_TRAILERS); 243 assertThat(mError).isNull(); 244 245 mResponseStep = ResponseStep.ON_READ_COMPLETED; 246 mResponseInfo = info; 247 248 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead; 249 mHttpResponseDataLength += bytesRead; 250 final byte[] lastDataReceivedAsBytes = new byte[bytesRead]; 251 // Rewind byteBuffer.position() to pre-read() position. 252 byteBuffer.position(mBufferPositionBeforeRead); 253 // This restores byteBuffer.position() to its value on entrance to 254 // this function. 255 byteBuffer.get(lastDataReceivedAsBytes); 256 257 mResponseAsString += new String(lastDataReceivedAsBytes); 258 259 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 260 return; 261 } 262 // Do not read if EOF has been reached. 263 if (!endOfStream) { 264 startNextRead(stream); 265 } 266 } 267 268 @Override onWriteCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)269 public void onWriteCompleted( 270 BidirectionalStream stream, 271 UrlResponseInfo info, 272 ByteBuffer buffer, 273 boolean endOfStream) { 274 checkOnValidThread(); 275 assertThat(stream.isDone()).isFalse(); 276 assertThat(mError).isNull(); 277 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; 278 mResponseInfo = info; 279 if (!mWriteBuffersToBeAcked.isEmpty()) { 280 assertThat(mWriteBuffersToBeAcked.get(0).mBuffer).isEqualTo(buffer); 281 mWriteBuffersToBeAcked.remove(0); 282 } 283 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 284 return; 285 } 286 startNextWrite(stream); 287 } 288 289 @Override onResponseTrailersReceived( BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)290 public void onResponseTrailersReceived( 291 BidirectionalStream stream, 292 UrlResponseInfo info, 293 UrlResponseInfo.HeaderBlock trailers) { 294 checkOnValidThread(); 295 assertThat(stream.isDone()).isFalse(); 296 assertThat(mError).isNull(); 297 mResponseStep = ResponseStep.ON_TRAILERS; 298 mResponseInfo = info; 299 mTrailers = trailers; 300 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 301 return; 302 } 303 } 304 305 @Override onSucceeded(BidirectionalStream stream, UrlResponseInfo info)306 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 307 checkOnValidThread(); 308 assertThat(stream.isDone()).isTrue(); 309 assertThat(mResponseStep) 310 .isAnyOf( 311 ResponseStep.ON_RESPONSE_STARTED, 312 ResponseStep.ON_READ_COMPLETED, 313 ResponseStep.ON_WRITE_COMPLETED, 314 ResponseStep.ON_TRAILERS); 315 assertThat(mOnErrorCalled).isFalse(); 316 assertThat(mOnCanceledCalled).isFalse(); 317 assertThat(mError).isNull(); 318 assertThat(mWriteBuffers).isEmpty(); 319 assertThat(mWriteBuffersToBeAcked).isEmpty(); 320 321 mResponseStep = ResponseStep.ON_SUCCEEDED; 322 mResponseInfo = info; 323 openDone(); 324 mBlockOnTerminalState.block(); 325 maybeThrowCancelOrPause(stream, mReadStepBlock); 326 } 327 328 @Override onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)329 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error) { 330 checkOnValidThread(); 331 assertThat(stream.isDone()).isTrue(); 332 // Shouldn't happen after success. 333 assertThat(mResponseStep).isNotEqualTo(ResponseStep.ON_SUCCEEDED); 334 // Should happen at most once for a single stream. 335 assertThat(mOnErrorCalled).isFalse(); 336 assertThat(mOnCanceledCalled).isFalse(); 337 assertThat(mError).isNull(); 338 mResponseStep = ResponseStep.ON_FAILED; 339 mResponseInfo = info; 340 341 mOnErrorCalled = true; 342 mError = error; 343 openDone(); 344 mBlockOnTerminalState.block(); 345 maybeThrowCancelOrPause(stream, mReadStepBlock); 346 } 347 348 @Override onCanceled(BidirectionalStream stream, UrlResponseInfo info)349 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { 350 checkOnValidThread(); 351 assertThat(stream.isDone()).isTrue(); 352 // Should happen at most once for a single stream. 353 assertThat(mOnCanceledCalled).isFalse(); 354 assertThat(mOnErrorCalled).isFalse(); 355 assertThat(mError).isNull(); 356 mResponseStep = ResponseStep.ON_CANCELED; 357 mResponseInfo = info; 358 359 mOnCanceledCalled = true; 360 openDone(); 361 mBlockOnTerminalState.block(); 362 maybeThrowCancelOrPause(stream, mReadStepBlock); 363 } 364 startNextRead(BidirectionalStream stream)365 public void startNextRead(BidirectionalStream stream) { 366 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); 367 } 368 startNextRead(BidirectionalStream stream, ByteBuffer buffer)369 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { 370 mBufferPositionBeforeRead = buffer.position(); 371 stream.read(buffer); 372 } 373 startNextWrite(BidirectionalStream stream)374 public void startNextWrite(BidirectionalStream stream) { 375 if (!mWriteBuffers.isEmpty()) { 376 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator(); 377 while (iterator.hasNext()) { 378 WriteBuffer b = iterator.next(); 379 stream.write(b.mBuffer, !iterator.hasNext()); 380 iterator.remove(); 381 if (b.mFlush) { 382 stream.flush(); 383 break; 384 } 385 } 386 } 387 } 388 isDone()389 public boolean isDone() { 390 // It's not mentioned by the Android docs, but block(0) seems to block 391 // indefinitely, so have to block for one millisecond to get state 392 // without blocking. 393 return mDone.block(1); 394 } 395 396 /** Returns the number of pending Writes. */ numPendingWrites()397 public int numPendingWrites() { 398 return mWriteBuffers.size(); 399 } 400 401 /** 402 * Asserts that there is no callback error before trying to access responseInfo. Only use this 403 * when you expect {@code mError} to be null. 404 * @return {@link UrlResponseInfo} 405 */ getResponseInfoWithChecks()406 public UrlResponseInfo getResponseInfoWithChecks() { 407 assertThat(mError).isNull(); 408 assertThat(mOnErrorCalled).isFalse(); 409 assertThat(mResponseInfo).isNotNull(); 410 return mResponseInfo; 411 } 412 413 /** 414 * Simply returns {@code mResponseInfo} with no nullability or error checks. 415 * @return {@link UrlResponseInfo} 416 */ getResponseInfo()417 public UrlResponseInfo getResponseInfo() { 418 return mResponseInfo; 419 } 420 openDone()421 protected void openDone() { 422 mDone.open(); 423 } 424 425 /** 426 * Returns {@code false} if the callback should continue to advance the 427 * stream. 428 */ maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)429 private boolean maybeThrowCancelOrPause( 430 final BidirectionalStream stream, ConditionVariable stepBlock) { 431 if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) { 432 if (!mAutoAdvance) { 433 stepBlock.open(); 434 return true; 435 } 436 return false; 437 } 438 439 if (mFailureType == FailureType.THROW_SYNC) { 440 throw new IllegalStateException("Callback Exception."); 441 } 442 Runnable task = 443 new Runnable() { 444 @Override 445 public void run() { 446 stream.cancel(); 447 } 448 }; 449 if (mFailureType == FailureType.CANCEL_ASYNC 450 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { 451 getExecutor().execute(task); 452 } else { 453 task.run(); 454 } 455 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; 456 } 457 458 /** Checks whether callback methods are invoked on the correct thread. */ checkOnValidThread()459 private void checkOnValidThread() { 460 if (!mUseDirectExecutor) { 461 assertThat(Thread.currentThread()).isEqualTo(mExecutorThread); 462 } 463 } 464 } 465