1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package android.net.http.cts.util; 18 19 import static org.hamcrest.Matchers.equalTo; 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.assertNull; 23 import static org.junit.Assert.assertSame; 24 import static org.junit.Assert.assertTrue; 25 import static org.junit.Assume.assumeThat; 26 import static org.junit.Assume.assumeTrue; 27 28 import android.net.http.BidirectionalStream; 29 import android.net.http.HeaderBlock; 30 import android.net.http.HttpException; 31 import android.net.http.UrlResponseInfo; 32 import android.os.ConditionVariable; 33 34 import java.nio.ByteBuffer; 35 import java.util.ArrayList; 36 import java.util.Iterator; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.ExecutorService; 39 import java.util.concurrent.Executors; 40 import java.util.concurrent.ThreadFactory; 41 42 /** 43 * Callback that tracks information from different callbacks and has a method to block thread until 44 * the stream completes on another thread. Allows to cancel, block stream or throw an exception from 45 * an arbitrary step. 46 */ 47 public class TestBidirectionalStreamCallback implements BidirectionalStream.Callback { 48 private static final int TIMEOUT_MS = 12_000; 49 public UrlResponseInfo mResponseInfo; 50 public HttpException mError; 51 52 public ResponseStep mResponseStep = ResponseStep.NOTHING; 53 54 public boolean mOnErrorCalled; 55 public boolean mOnCanceledCalled; 56 57 public int mHttpResponseDataLength; 58 public String mResponseAsString = ""; 59 60 public HeaderBlock mTrailers; 61 62 private static final int READ_BUFFER_SIZE = 32 * 1024; 63 64 // When false, the consumer is responsible for all calls into the stream 65 // that advance it. 66 private boolean mAutoAdvance = true; 67 68 // Conditionally fail on certain steps. 69 private FailureType mFailureType = FailureType.NONE; 70 private ResponseStep mFailureStep = ResponseStep.NOTHING; 71 72 // Signals when the stream is done either successfully or not. 73 private final ConditionVariable mDone = new ConditionVariable(); 74 75 // Signaled on each step when mAutoAdvance is false. 76 private final ConditionVariable mReadStepBlock = new ConditionVariable(); 77 private final ConditionVariable mWriteStepBlock = new ConditionVariable(); 78 79 // Executor Service for Cronet callbacks. 80 private final ExecutorService mExecutorService = 81 Executors.newSingleThreadExecutor(new ExecutorThreadFactory()); 82 private Thread mExecutorThread; 83 84 // position() of ByteBuffer prior to read() call. 85 private int mBufferPositionBeforeRead; 86 87 // Data to write. 88 private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>(); 89 90 // Buffers that we yet to receive the corresponding onWriteCompleted callback. 91 private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>(); 92 93 // Whether to use a direct executor. 94 private final boolean mUseDirectExecutor; 95 private final DirectExecutor mDirectExecutor; 96 97 private class ExecutorThreadFactory implements ThreadFactory { 98 @Override newThread(Runnable r)99 public Thread newThread(Runnable r) { 100 mExecutorThread = new Thread(r); 101 return mExecutorThread; 102 } 103 } 104 105 private static class WriteBuffer { 106 final ByteBuffer mBuffer; 107 final boolean mFlush; 108 WriteBuffer(ByteBuffer buffer, boolean flush)109 WriteBuffer(ByteBuffer buffer, boolean flush) { 110 mBuffer = buffer; 111 mFlush = flush; 112 } 113 } 114 115 private static class DirectExecutor implements Executor { 116 @Override execute(Runnable task)117 public void execute(Runnable task) { 118 task.run(); 119 } 120 } 121 122 public enum ResponseStep { 123 NOTHING, 124 ON_STREAM_READY, 125 ON_RESPONSE_STARTED, 126 ON_READ_COMPLETED, 127 ON_WRITE_COMPLETED, 128 ON_TRAILERS, 129 ON_CANCELED, 130 ON_FAILED, 131 ON_SUCCEEDED, 132 } 133 134 public enum FailureType { 135 NONE, 136 CANCEL_SYNC, 137 CANCEL_ASYNC, 138 // Same as above, but continues to advance the stream after posting 139 // the cancellation task. 140 CANCEL_ASYNC_WITHOUT_PAUSE, 141 THROW_SYNC 142 } 143 isTerminalCallback(ResponseStep step)144 private boolean isTerminalCallback(ResponseStep step) { 145 switch (step) { 146 case ON_SUCCEEDED: 147 case ON_CANCELED: 148 case ON_FAILED: 149 return true; 150 default: 151 return false; 152 } 153 } 154 TestBidirectionalStreamCallback()155 public TestBidirectionalStreamCallback() { 156 mUseDirectExecutor = false; 157 mDirectExecutor = null; 158 } 159 TestBidirectionalStreamCallback(boolean useDirectExecutor)160 public TestBidirectionalStreamCallback(boolean useDirectExecutor) { 161 mUseDirectExecutor = useDirectExecutor; 162 mDirectExecutor = new DirectExecutor(); 163 } 164 setAutoAdvance(boolean autoAdvance)165 public void setAutoAdvance(boolean autoAdvance) { 166 mAutoAdvance = autoAdvance; 167 } 168 setFailure(FailureType failureType, ResponseStep failureStep)169 public void setFailure(FailureType failureType, ResponseStep failureStep) { 170 mFailureStep = failureStep; 171 mFailureType = failureType; 172 } 173 blockForDone()174 public boolean blockForDone() { 175 return mDone.block(TIMEOUT_MS); 176 } 177 178 /** 179 * Waits for a terminal callback to complete execution before failing if the callback is not the 180 * expected one 181 * 182 * @param expectedStep the expected callback step 183 */ expectCallback(ResponseStep expectedStep)184 public void expectCallback(ResponseStep expectedStep) { 185 if (isTerminalCallback(expectedStep)) { 186 assertTrue(String.format( 187 "Request timed out. Expected %s callback. Current callback is %s", 188 expectedStep, mResponseStep), 189 blockForDone()); 190 } 191 assertSame(expectedStep, mResponseStep); 192 } 193 194 /** 195 * Waits for a terminal callback to complete execution before skipping the test if the callback 196 * is not the expected one 197 * 198 * @param expectedStep the expected callback step 199 */ assumeCallback(ResponseStep expectedStep)200 public void assumeCallback(ResponseStep expectedStep) { 201 if (isTerminalCallback(expectedStep)) { 202 assumeTrue( 203 String.format( 204 "Request timed out. Expected %s callback. Current callback is %s", 205 expectedStep, mResponseStep), 206 blockForDone()); 207 } 208 assumeThat(expectedStep, equalTo(mResponseStep)); 209 } 210 waitForNextReadStep()211 public void waitForNextReadStep() { 212 mReadStepBlock.block(); 213 mReadStepBlock.close(); 214 } 215 waitForNextWriteStep()216 public void waitForNextWriteStep() { 217 mWriteStepBlock.block(); 218 mWriteStepBlock.close(); 219 } 220 getExecutor()221 public Executor getExecutor() { 222 if (mUseDirectExecutor) { 223 return mDirectExecutor; 224 } 225 return mExecutorService; 226 } 227 shutdownExecutor()228 public void shutdownExecutor() { 229 if (mUseDirectExecutor) { 230 throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown"); 231 } 232 mExecutorService.shutdown(); 233 } 234 addWriteData(byte[] data)235 public void addWriteData(byte[] data) { 236 addWriteData(data, true); 237 } 238 addWriteData(byte[] data, boolean flush)239 public void addWriteData(byte[] data, boolean flush) { 240 ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length); 241 writeBuffer.put(data); 242 writeBuffer.flip(); 243 mWriteBuffers.add(new WriteBuffer(writeBuffer, flush)); 244 mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush)); 245 } 246 247 @Override onStreamReady(BidirectionalStream stream)248 public void onStreamReady(BidirectionalStream stream) { 249 checkOnValidThread(); 250 assertFalse(stream.isDone()); 251 assertEquals(ResponseStep.NOTHING, mResponseStep); 252 assertNull(mError); 253 mResponseStep = ResponseStep.ON_STREAM_READY; 254 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 255 return; 256 } 257 startNextWrite(stream); 258 } 259 260 @Override onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)261 public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { 262 checkOnValidThread(); 263 assertFalse(stream.isDone()); 264 assertTrue( 265 mResponseStep == ResponseStep.NOTHING 266 || mResponseStep == ResponseStep.ON_STREAM_READY 267 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED); 268 assertNull(mError); 269 270 mResponseStep = ResponseStep.ON_RESPONSE_STARTED; 271 mResponseInfo = info; 272 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 273 return; 274 } 275 startNextRead(stream); 276 } 277 278 @Override onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)279 public void onReadCompleted( 280 BidirectionalStream stream, 281 UrlResponseInfo info, 282 ByteBuffer byteBuffer, 283 boolean endOfStream) { 284 checkOnValidThread(); 285 assertFalse(stream.isDone()); 286 assertTrue( 287 mResponseStep == ResponseStep.ON_RESPONSE_STARTED 288 || mResponseStep == ResponseStep.ON_READ_COMPLETED 289 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 290 || mResponseStep == ResponseStep.ON_TRAILERS); 291 assertNull(mError); 292 293 mResponseStep = ResponseStep.ON_READ_COMPLETED; 294 mResponseInfo = info; 295 296 final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead; 297 mHttpResponseDataLength += bytesRead; 298 final byte[] lastDataReceivedAsBytes = new byte[bytesRead]; 299 // Rewind byteBuffer.position() to pre-read() position. 300 byteBuffer.position(mBufferPositionBeforeRead); 301 // This restores byteBuffer.position() to its value on entrance to 302 // this function. 303 byteBuffer.get(lastDataReceivedAsBytes); 304 305 mResponseAsString += new String(lastDataReceivedAsBytes); 306 307 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 308 return; 309 } 310 // Do not read if EOF has been reached. 311 if (!endOfStream) { 312 startNextRead(stream); 313 } 314 } 315 316 @Override onWriteCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)317 public void onWriteCompleted( 318 BidirectionalStream stream, 319 UrlResponseInfo info, 320 ByteBuffer buffer, 321 boolean endOfStream) { 322 checkOnValidThread(); 323 assertFalse(stream.isDone()); 324 assertNull(mError); 325 mResponseStep = ResponseStep.ON_WRITE_COMPLETED; 326 mResponseInfo = info; 327 if (!mWriteBuffersToBeAcked.isEmpty()) { 328 assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer); 329 mWriteBuffersToBeAcked.remove(0); 330 } 331 if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) { 332 return; 333 } 334 startNextWrite(stream); 335 } 336 337 @Override onResponseTrailersReceived( BidirectionalStream stream, UrlResponseInfo info, HeaderBlock trailers)338 public void onResponseTrailersReceived( 339 BidirectionalStream stream, 340 UrlResponseInfo info, 341 HeaderBlock trailers) { 342 checkOnValidThread(); 343 assertFalse(stream.isDone()); 344 assertNull(mError); 345 mResponseStep = ResponseStep.ON_TRAILERS; 346 mResponseInfo = info; 347 mTrailers = trailers; 348 if (maybeThrowCancelOrPause(stream, mReadStepBlock)) { 349 return; 350 } 351 } 352 353 @Override onSucceeded(BidirectionalStream stream, UrlResponseInfo info)354 public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { 355 checkOnValidThread(); 356 assertTrue(stream.isDone()); 357 assertTrue( 358 mResponseStep == ResponseStep.ON_RESPONSE_STARTED 359 || mResponseStep == ResponseStep.ON_READ_COMPLETED 360 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED 361 || mResponseStep == ResponseStep.ON_TRAILERS); 362 assertFalse(mOnErrorCalled); 363 assertFalse(mOnCanceledCalled); 364 assertNull(mError); 365 assertEquals(0, mWriteBuffers.size()); 366 assertEquals(0, mWriteBuffersToBeAcked.size()); 367 368 mResponseStep = ResponseStep.ON_SUCCEEDED; 369 mResponseInfo = info; 370 openDone(); 371 maybeThrowCancelOrPause(stream, mReadStepBlock); 372 } 373 374 @Override onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error)375 public void onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error) { 376 checkOnValidThread(); 377 assertTrue(stream.isDone()); 378 // Shouldn't happen after success. 379 assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED); 380 // Should happen at most once for a single stream. 381 assertFalse(mOnErrorCalled); 382 assertFalse(mOnCanceledCalled); 383 assertNull(mError); 384 mResponseStep = ResponseStep.ON_FAILED; 385 mResponseInfo = info; 386 387 mOnErrorCalled = true; 388 mError = error; 389 openDone(); 390 maybeThrowCancelOrPause(stream, mReadStepBlock); 391 } 392 393 @Override onCanceled(BidirectionalStream stream, UrlResponseInfo info)394 public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { 395 checkOnValidThread(); 396 assertTrue(stream.isDone()); 397 // Should happen at most once for a single stream. 398 assertFalse(mOnCanceledCalled); 399 assertFalse(mOnErrorCalled); 400 assertNull(mError); 401 mResponseStep = ResponseStep.ON_CANCELED; 402 mResponseInfo = info; 403 404 mOnCanceledCalled = true; 405 openDone(); 406 maybeThrowCancelOrPause(stream, mReadStepBlock); 407 } 408 startNextRead(BidirectionalStream stream)409 public void startNextRead(BidirectionalStream stream) { 410 startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE)); 411 } 412 startNextRead(BidirectionalStream stream, ByteBuffer buffer)413 public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) { 414 mBufferPositionBeforeRead = buffer.position(); 415 stream.read(buffer); 416 } 417 startNextWrite(BidirectionalStream stream)418 public void startNextWrite(BidirectionalStream stream) { 419 if (!mWriteBuffers.isEmpty()) { 420 Iterator<WriteBuffer> iterator = mWriteBuffers.iterator(); 421 while (iterator.hasNext()) { 422 WriteBuffer b = iterator.next(); 423 stream.write(b.mBuffer, !iterator.hasNext()); 424 iterator.remove(); 425 if (b.mFlush) { 426 stream.flush(); 427 break; 428 } 429 } 430 } 431 } 432 isDone()433 public boolean isDone() { 434 // It's not mentioned by the Android docs, but block(0) seems to block 435 // indefinitely, so have to block for one millisecond to get state 436 // without blocking. 437 return mDone.block(1); 438 } 439 440 /** Returns the number of pending Writes. */ numPendingWrites()441 public int numPendingWrites() { 442 return mWriteBuffers.size(); 443 } 444 openDone()445 protected void openDone() { 446 mDone.open(); 447 } 448 449 /** Returns {@code false} if the callback should continue to advance the stream. */ maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)450 private boolean maybeThrowCancelOrPause( 451 final BidirectionalStream stream, ConditionVariable stepBlock) { 452 if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) { 453 if (!mAutoAdvance) { 454 stepBlock.open(); 455 return true; 456 } 457 return false; 458 } 459 460 if (mFailureType == FailureType.THROW_SYNC) { 461 throw new IllegalStateException("Callback Exception."); 462 } 463 Runnable task = 464 new Runnable() { 465 @Override 466 public void run() { 467 stream.cancel(); 468 } 469 }; 470 if (mFailureType == FailureType.CANCEL_ASYNC 471 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) { 472 getExecutor().execute(task); 473 } else { 474 task.run(); 475 } 476 return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE; 477 } 478 479 /** Checks whether callback methods are invoked on the correct thread. */ checkOnValidThread()480 private void checkOnValidThread() { 481 if (!mUseDirectExecutor) { 482 assertEquals(mExecutorThread, Thread.currentThread()); 483 } 484 } 485 } 486