1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net; 6 7 import android.net.http.BidirectionalStream; 8 import android.net.http.HttpException; 9 import android.net.http.HeaderBlock; 10 import android.net.http.UrlResponseInfo; 11 import android.os.ConditionVariable; 12 13 import static junit.framework.Assert.assertEquals; 14 import static junit.framework.Assert.assertFalse; 15 import static junit.framework.Assert.assertNull; 16 import static junit.framework.Assert.assertTrue; 17 18 import java.nio.ByteBuffer; 19 import java.util.ArrayList; 20 import java.util.Iterator; 21 import java.util.concurrent.Executor; 22 import java.util.concurrent.ExecutorService; 23 import java.util.concurrent.Executors; 24 import java.util.concurrent.ThreadFactory; 25 26 /** 27 * Callback that tracks information from different callbacks and and has a 28 * method to block thread until the stream completes on another thread. 29 * Allows to cancel, block stream or throw an exception from an arbitrary step. 30 */ 31 public class TestBidirectionalStreamCallback implements BidirectionalStream.Callback { 32 public UrlResponseInfo mResponseInfo; 33 public HttpException mError; 34 35 public ResponseStep mResponseStep = ResponseStep.NOTHING; 36 37 public boolean mOnErrorCalled; 38 public boolean mOnCanceledCalled; 39 40 public int mHttpResponseDataLength; 41 public String mResponseAsString = ""; 42 43 public HeaderBlock mTrailers; 44 45 private static final int READ_BUFFER_SIZE = 32 * 1024; 46 47 // When false, the consumer is responsible for all calls into the stream 48 // that advance it. 49 private boolean mAutoAdvance = true; 50 51 // Conditionally fail on certain steps. 52 private FailureType mFailureType = FailureType.NONE; 53 private ResponseStep mFailureStep = ResponseStep.NOTHING; 54 55 // Signals when the stream is done either successfully or not. 56 private final ConditionVariable mDone = new ConditionVariable(); 57 58 // Signaled on each step when mAutoAdvance is false. 59 private final ConditionVariable mReadStepBlock = new ConditionVariable(); 60 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); 61 62 // Executor Service for Cronet callbacks. 63 private final ExecutorService mExecutorService = 64 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); 65 private Thread mExecutorThread; 66 67 // position() of ByteBuffer prior to read() call. 68 private int mBufferPositionBeforeRead; 69 70 // Data to write. 71 private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>(); 72 73 // Buffers that we yet to receive the corresponding onWriteCompleted callback. 74 private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>(); 75 76 // Whether to use a direct executor. 77 private final boolean mUseDirectExecutor; 78 private final DirectExecutor mDirectExecutor; 79 80 private class ExecutorThreadFactory implements ThreadFactory { 81 @Override newThread(Runnable r)82 public Thread newThread(Runnable r) { 83 mExecutorThread = new Thread(r); 84 return mExecutorThread; 85 } 86 } 87 88 private static class WriteBuffer { 89 final ByteBuffer mBuffer; 90 final boolean mFlush; WriteBuffer(ByteBuffer buffer, boolean flush)91 public WriteBuffer(ByteBuffer buffer, boolean flush) { 92 mBuffer = buffer; 93 mFlush = flush; 94 } 95 } 96 97 private static class DirectExecutor implements Executor { 98 @Override execute(Runnable task)99 public void execute(Runnable task) { 100 task.run(); 101 } 102 } 103 104 public enum ResponseStep { 105 NOTHING, 106 ON_STREAM_READY, 107 ON_RESPONSE_STARTED, 108 ON_READ_COMPLETED, 109 ON_WRITE_COMPLETED, 110 ON_TRAILERS, 111 ON_CANCELED, 112 ON_FAILED, 113 ON_SUCCEEDED, 114 } 115 116 public enum FailureType { 117 NONE, 118 CANCEL_SYNC, 119 CANCEL_ASYNC, 120 // Same as above, but continues to advance the stream after posting 121 // the cancellation task. 122 CANCEL_ASYNC_WITHOUT_PAUSE, 123 THROW_SYNC 124 } 125 TestBidirectionalStreamCallback()126 public TestBidirectionalStreamCallback() { 127 mUseDirectExecutor = false; 128 mDirectExecutor = null; 129 } 130 TestBidirectionalStreamCallback(boolean useDirectExecutor)131 public TestBidirectionalStreamCallback(boolean useDirectExecutor) { 132 mUseDirectExecutor = useDirectExecutor; 133 mDirectExecutor = new DirectExecutor(); 134 } 135 setAutoAdvance(boolean autoAdvance)136 public void setAutoAdvance(boolean autoAdvance) { 137 mAutoAdvance = autoAdvance; 138 } 139 setFailure(FailureType failureType, ResponseStep failureStep)140 public void setFailure(FailureType failureType, ResponseStep failureStep) { 141 mFailureStep = failureStep; 142 mFailureType = failureType; 143 } 144 blockForDone()145 public void blockForDone() { 146 mDone.block(); 147 } 148 waitForNextReadStep()149 public void waitForNextReadStep() { 150 mReadStepBlock.block(); 151 mReadStepBlock.close(); 152 } 153 waitForNextWriteStep()154 public void waitForNextWriteStep() { 155 mWriteStepBlock.block(); 156 mWriteStepBlock.close(); 157 } 158 getExecutor()159 public Executor getExecutor() { 160 if (mUseDirectExecutor) { 161 return mDirectExecutor; 162 } 163 return mExecutorService; 164 } 165 shutdownExecutor()166 public void shutdownExecutor() { 167 if (mUseDirectExecutor) { 168 throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown"); 169 } 170 mExecutorService.shutdown(); 171 } 172 addWriteData(byte[] data)173 public void addWriteData(byte[] data) { 174 addWriteData(data, true); 175 } 176 addWriteData(byte[] data, boolean flush)177 public void addWriteData(byte[] data, boolean flush) { 178 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); 179 writeBuffer.put(data); 180 writeBuffer.flip(); 181 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush)); 182 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush)); 183 } 184 185 @Override onStreamReady(BidirectionalStream stream)186 public void onStreamReady(BidirectionalStream stream) { 187 checkOnValidThread(); 188 assertFalse(stream.isDone()); 189 assertEquals(ResponseStep.NOTHING, mResponseStep); 190 assertNull(mError); 191 mResponseStep = ResponseStep.ON_STREAM_READY; 192 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 193 return; 194 } 195 startNextWrite(stream); 196 } 197 198 @Override onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)199 public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { 200 checkOnValidThread(); 201 assertFalse(stream.isDone()); 202 assertTrue(mResponseStep == ResponseStep.NOTHING 203 || mResponseStep == ResponseStep.ON_STREAM_READY 204 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); 205 assertNull(mError); 206 207 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; 208 mResponseInfo = info; 209 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 210 return; 211 } 212 startNextRead(stream); 213 } 214 215 @Override onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)216 public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, 217 ByteBuffer byteBuffer, boolean endOfStream) { 218 checkOnValidThread(); 219 assertFalse(stream.isDone()); 220 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED 221 || mResponseStep == ResponseStep.ON_READ_COMPLETED 222 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 223 || mResponseStep == ResponseStep.ON_TRAILERS); 224 assertNull(mError); 225 226 mResponseStep = ResponseStep.ON_READ_COMPLETED; 227 mResponseInfo = info; 228 229 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead; 230 mHttpResponseDataLength += bytesRead; 231 final byte[] lastDataReceivedAsBytes = new byte[bytesRead]; 232 // Rewind byteBuffer.position() to pre-read() position. 233 byteBuffer.position(mBufferPositionBeforeRead); 234 // This restores byteBuffer.position() to its value on entrance to 235 // this function. 236 byteBuffer.get(lastDataReceivedAsBytes); 237 238 mResponseAsString += new String(lastDataReceivedAsBytes); 239 240 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 241 return; 242 } 243 // Do not read if EOF has been reached. 244 if (!endOfStream) { 245 startNextRead(stream); 246 } 247 } 248 249 @Override onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)250 public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, 251 ByteBuffer buffer, boolean endOfStream) { 252 checkOnValidThread(); 253 assertFalse(stream.isDone()); 254 assertNull(mError); 255 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; 256 mResponseInfo = info; 257 if (!mWriteBuffersToBeAcked.isEmpty()) { 258 assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer); 259 mWriteBuffersToBeAcked.remove(0); 260 } 261 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 262 return; 263 } 264 startNextWrite(stream); 265 } 266 267 @Override onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, HeaderBlock trailers)268 public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, 269 HeaderBlock trailers) { 270 checkOnValidThread(); 271 assertFalse(stream.isDone()); 272 assertNull(mError); 273 mResponseStep = ResponseStep.ON_TRAILERS; 274 mResponseInfo = info; 275 mTrailers = trailers; 276 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 277 return; 278 } 279 } 280 281 @Override onSucceeded(BidirectionalStream stream, UrlResponseInfo info)282 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 283 checkOnValidThread(); 284 assertTrue(stream.isDone()); 285 assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED 286 || mResponseStep == ResponseStep.ON_READ_COMPLETED 287 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 288 || mResponseStep == ResponseStep.ON_TRAILERS); 289 assertFalse(mOnErrorCalled); 290 assertFalse(mOnCanceledCalled); 291 assertNull(mError); 292 assertEquals(0, mWriteBuffers.size()); 293 assertEquals(0, mWriteBuffersToBeAcked.size()); 294 295 mResponseStep = ResponseStep.ON_SUCCEEDED; 296 mResponseInfo = info; 297 openDone(); 298 maybeThrowCancelOrPause(stream, mReadStepBlock); 299 } 300 301 @Override onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error)302 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error) { 303 checkOnValidThread(); 304 assertTrue(stream.isDone()); 305 // Shouldn't happen after success. 306 assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED); 307 // Should happen at most once for a single stream. 308 assertFalse(mOnErrorCalled); 309 assertFalse(mOnCanceledCalled); 310 assertNull(mError); 311 mResponseStep = ResponseStep.ON_FAILED; 312 mResponseInfo = info; 313 314 mOnErrorCalled = true; 315 mError = error; 316 openDone(); 317 maybeThrowCancelOrPause(stream, mReadStepBlock); 318 } 319 320 @Override onCanceled(BidirectionalStream stream, UrlResponseInfo info)321 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { 322 checkOnValidThread(); 323 assertTrue(stream.isDone()); 324 // Should happen at most once for a single stream. 325 assertFalse(mOnCanceledCalled); 326 assertFalse(mOnErrorCalled); 327 assertNull(mError); 328 mResponseStep = ResponseStep.ON_CANCELED; 329 mResponseInfo = info; 330 331 mOnCanceledCalled = true; 332 openDone(); 333 maybeThrowCancelOrPause(stream, mReadStepBlock); 334 } 335 startNextRead(BidirectionalStream stream)336 public void startNextRead(BidirectionalStream stream) { 337 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); 338 } 339 startNextRead(BidirectionalStream stream, ByteBuffer buffer)340 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { 341 mBufferPositionBeforeRead = buffer.position(); 342 stream.read(buffer); 343 } 344 startNextWrite(BidirectionalStream stream)345 public void startNextWrite(BidirectionalStream stream) { 346 if (!mWriteBuffers.isEmpty()) { 347 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator(); 348 while (iterator.hasNext()) { 349 WriteBuffer b = iterator.next(); 350 stream.write(b.mBuffer, !iterator.hasNext()); 351 iterator.remove(); 352 if (b.mFlush) { 353 stream.flush(); 354 break; 355 } 356 } 357 } 358 } 359 isDone()360 public boolean isDone() { 361 // It's not mentioned by the Android docs, but block(0) seems to block 362 // indefinitely, so have to block for one millisecond to get state 363 // without blocking. 364 return mDone.block(1); 365 } 366 367 /** 368 * Returns the number of pending Writes. 369 */ numPendingWrites()370 public int numPendingWrites() { 371 return mWriteBuffers.size(); 372 } 373 openDone()374 protected void openDone() { 375 mDone.open(); 376 } 377 378 /** 379 * Returns {@code false} if the callback should continue to advance the 380 * stream. 381 */ maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)382 private boolean maybeThrowCancelOrPause( 383 final BidirectionalStream stream, ConditionVariable stepBlock) { 384 if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) { 385 if (!mAutoAdvance) { 386 stepBlock.open(); 387 return true; 388 } 389 return false; 390 } 391 392 if (mFailureType == FailureType.THROW_SYNC) { 393 throw new IllegalStateException("Callback Exception."); 394 } 395 Runnable task = new Runnable() { 396 @Override 397 public void run() { 398 stream.cancel(); 399 } 400 }; 401 if (mFailureType == FailureType.CANCEL_ASYNC 402 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { 403 getExecutor().execute(task); 404 } else { 405 task.run(); 406 } 407 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; 408 } 409 410 /** 411 * Checks whether callback methods are invoked on the correct thread. 412 */ checkOnValidThread()413 private void checkOnValidThread() { 414 if (!mUseDirectExecutor) { 415 assertEquals(mExecutorThread, Thread.currentThread()); 416 } 417 } 418 } 419