• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package android.net.http.cts.util;
18 
19 import static org.hamcrest.Matchers.equalTo;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertSame;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assume.assumeThat;
26 import static org.junit.Assume.assumeTrue;
27 
28 import android.net.http.BidirectionalStream;
29 import android.net.http.HeaderBlock;
30 import android.net.http.HttpException;
31 import android.net.http.UrlResponseInfo;
32 import android.os.ConditionVariable;
33 
34 import java.nio.ByteBuffer;
35 import java.util.ArrayList;
36 import java.util.Iterator;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.ThreadFactory;
41 
42 /**
43  * Callback that tracks information from different callbacks and has a method to block thread until
44  * the stream completes on another thread. Allows to cancel, block stream or throw an exception from
45  * an arbitrary step.
46  */
47 public class TestBidirectionalStreamCallback implements BidirectionalStream.Callback {
48     private static final int TIMEOUT_MS = 12_000;
49     public UrlResponseInfo mResponseInfo;
50     public HttpException mError;
51 
52     public ResponseStep mResponseStep = ResponseStep.NOTHING;
53 
54     public boolean mOnErrorCalled;
55     public boolean mOnCanceledCalled;
56 
57     public int mHttpResponseDataLength;
58     public String mResponseAsString = "";
59 
60     public HeaderBlock mTrailers;
61 
62     private static final int READ_BUFFER_SIZE = 32 * 1024;
63 
64     // When false, the consumer is responsible for all calls into the stream
65     // that advance it.
66     private boolean mAutoAdvance = true;
67 
68     // Conditionally fail on certain steps.
69     private FailureType mFailureType = FailureType.NONE;
70     private ResponseStep mFailureStep = ResponseStep.NOTHING;
71 
72     // Signals when the stream is done either successfully or not.
73     private final ConditionVariable mDone = new ConditionVariable();
74 
75     // Signaled on each step when mAutoAdvance is false.
76     private final ConditionVariable mReadStepBlock = new ConditionVariable();
77     private final ConditionVariable mWriteStepBlock = new ConditionVariable();
78 
79     // Executor Service for Cronet callbacks.
80     private final ExecutorService mExecutorService =
81             Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
82     private Thread mExecutorThread;
83 
84     // position() of ByteBuffer prior to read() call.
85     private int mBufferPositionBeforeRead;
86 
87     // Data to write.
88     private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>();
89 
90     // Buffers that we yet to receive the corresponding onWriteCompleted callback.
91     private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>();
92 
93     // Whether to use a direct executor.
94     private final boolean mUseDirectExecutor;
95     private final DirectExecutor mDirectExecutor;
96 
97     private class ExecutorThreadFactory implements ThreadFactory {
98         @Override
newThread(Runnable r)99         public Thread newThread(Runnable r) {
100             mExecutorThread = new Thread(r);
101             return mExecutorThread;
102         }
103     }
104 
105     private static class WriteBuffer {
106         final ByteBuffer mBuffer;
107         final boolean mFlush;
108 
WriteBuffer(ByteBuffer buffer, boolean flush)109         WriteBuffer(ByteBuffer buffer, boolean flush) {
110             mBuffer = buffer;
111             mFlush = flush;
112         }
113     }
114 
115     private static class DirectExecutor implements Executor {
116         @Override
execute(Runnable task)117         public void execute(Runnable task) {
118             task.run();
119         }
120     }
121 
122     public enum ResponseStep {
123         NOTHING,
124         ON_STREAM_READY,
125         ON_RESPONSE_STARTED,
126         ON_READ_COMPLETED,
127         ON_WRITE_COMPLETED,
128         ON_TRAILERS,
129         ON_CANCELED,
130         ON_FAILED,
131         ON_SUCCEEDED,
132     }
133 
134     public enum FailureType {
135         NONE,
136         CANCEL_SYNC,
137         CANCEL_ASYNC,
138         // Same as above, but continues to advance the stream after posting
139         // the cancellation task.
140         CANCEL_ASYNC_WITHOUT_PAUSE,
141         THROW_SYNC
142     }
143 
isTerminalCallback(ResponseStep step)144     private boolean isTerminalCallback(ResponseStep step) {
145         switch (step) {
146             case ON_SUCCEEDED:
147             case ON_CANCELED:
148             case ON_FAILED:
149                 return true;
150             default:
151                 return false;
152         }
153     }
154 
TestBidirectionalStreamCallback()155     public TestBidirectionalStreamCallback() {
156         mUseDirectExecutor = false;
157         mDirectExecutor = null;
158     }
159 
TestBidirectionalStreamCallback(boolean useDirectExecutor)160     public TestBidirectionalStreamCallback(boolean useDirectExecutor) {
161         mUseDirectExecutor = useDirectExecutor;
162         mDirectExecutor = new DirectExecutor();
163     }
164 
setAutoAdvance(boolean autoAdvance)165     public void setAutoAdvance(boolean autoAdvance) {
166         mAutoAdvance = autoAdvance;
167     }
168 
setFailure(FailureType failureType, ResponseStep failureStep)169     public void setFailure(FailureType failureType, ResponseStep failureStep) {
170         mFailureStep = failureStep;
171         mFailureType = failureType;
172     }
173 
blockForDone()174     public boolean blockForDone() {
175         return mDone.block(TIMEOUT_MS);
176     }
177 
178     /**
179      * Waits for a terminal callback to complete execution before failing if the callback is not the
180      * expected one
181      *
182      * @param expectedStep the expected callback step
183      */
expectCallback(ResponseStep expectedStep)184     public void expectCallback(ResponseStep expectedStep) {
185         if (isTerminalCallback(expectedStep)) {
186             assertTrue(String.format(
187                             "Request timed out. Expected %s callback. Current callback is %s",
188                             expectedStep, mResponseStep),
189                     blockForDone());
190         }
191         assertSame(expectedStep, mResponseStep);
192     }
193 
194     /**
195      * Waits for a terminal callback to complete execution before skipping the test if the callback
196      * is not the expected one
197      *
198      * @param expectedStep the expected callback step
199      */
assumeCallback(ResponseStep expectedStep)200     public void assumeCallback(ResponseStep expectedStep) {
201         if (isTerminalCallback(expectedStep)) {
202             assumeTrue(
203                     String.format(
204                             "Request timed out. Expected %s callback. Current callback is %s",
205                             expectedStep, mResponseStep),
206                     blockForDone());
207         }
208         assumeThat(expectedStep, equalTo(mResponseStep));
209     }
210 
waitForNextReadStep()211     public void waitForNextReadStep() {
212         mReadStepBlock.block();
213         mReadStepBlock.close();
214     }
215 
waitForNextWriteStep()216     public void waitForNextWriteStep() {
217         mWriteStepBlock.block();
218         mWriteStepBlock.close();
219     }
220 
getExecutor()221     public Executor getExecutor() {
222         if (mUseDirectExecutor) {
223             return mDirectExecutor;
224         }
225         return mExecutorService;
226     }
227 
shutdownExecutor()228     public void shutdownExecutor() {
229         if (mUseDirectExecutor) {
230             throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown");
231         }
232         mExecutorService.shutdown();
233     }
234 
addWriteData(byte[] data)235     public void addWriteData(byte[] data) {
236         addWriteData(data, true);
237     }
238 
addWriteData(byte[] data, boolean flush)239     public void addWriteData(byte[] data, boolean flush) {
240         ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
241         writeBuffer.put(data);
242         writeBuffer.flip();
243         mWriteBuffers.add(new WriteBuffer(writeBuffer, flush));
244         mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush));
245     }
246 
247     @Override
onStreamReady(BidirectionalStream stream)248     public void onStreamReady(BidirectionalStream stream) {
249         checkOnValidThread();
250         assertFalse(stream.isDone());
251         assertEquals(ResponseStep.NOTHING, mResponseStep);
252         assertNull(mError);
253         mResponseStep = ResponseStep.ON_STREAM_READY;
254         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
255             return;
256         }
257         startNextWrite(stream);
258     }
259 
260     @Override
onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)261     public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
262         checkOnValidThread();
263         assertFalse(stream.isDone());
264         assertTrue(
265                 mResponseStep == ResponseStep.NOTHING
266                         || mResponseStep == ResponseStep.ON_STREAM_READY
267                         || mResponseStep == ResponseStep.ON_WRITE_COMPLETED);
268         assertNull(mError);
269 
270         mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
271         mResponseInfo = info;
272         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
273             return;
274         }
275         startNextRead(stream);
276     }
277 
278     @Override
onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)279     public void onReadCompleted(
280             BidirectionalStream stream,
281             UrlResponseInfo info,
282             ByteBuffer byteBuffer,
283             boolean endOfStream) {
284         checkOnValidThread();
285         assertFalse(stream.isDone());
286         assertTrue(
287                 mResponseStep == ResponseStep.ON_RESPONSE_STARTED
288                         || mResponseStep == ResponseStep.ON_READ_COMPLETED
289                         || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
290                         || mResponseStep == ResponseStep.ON_TRAILERS);
291         assertNull(mError);
292 
293         mResponseStep = ResponseStep.ON_READ_COMPLETED;
294         mResponseInfo = info;
295 
296         final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead;
297         mHttpResponseDataLength += bytesRead;
298         final byte[] lastDataReceivedAsBytes = new byte[bytesRead];
299         // Rewind byteBuffer.position() to pre-read() position.
300         byteBuffer.position(mBufferPositionBeforeRead);
301         // This restores byteBuffer.position() to its value on entrance to
302         // this function.
303         byteBuffer.get(lastDataReceivedAsBytes);
304 
305         mResponseAsString += new String(lastDataReceivedAsBytes);
306 
307         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
308             return;
309         }
310         // Do not read if EOF has been reached.
311         if (!endOfStream) {
312             startNextRead(stream);
313         }
314     }
315 
316     @Override
onWriteCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)317     public void onWriteCompleted(
318             BidirectionalStream stream,
319             UrlResponseInfo info,
320             ByteBuffer buffer,
321             boolean endOfStream) {
322         checkOnValidThread();
323         assertFalse(stream.isDone());
324         assertNull(mError);
325         mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
326         mResponseInfo = info;
327         if (!mWriteBuffersToBeAcked.isEmpty()) {
328             assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer);
329             mWriteBuffersToBeAcked.remove(0);
330         }
331         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
332             return;
333         }
334         startNextWrite(stream);
335     }
336 
337     @Override
onResponseTrailersReceived( BidirectionalStream stream, UrlResponseInfo info, HeaderBlock trailers)338     public void onResponseTrailersReceived(
339             BidirectionalStream stream,
340             UrlResponseInfo info,
341             HeaderBlock trailers) {
342         checkOnValidThread();
343         assertFalse(stream.isDone());
344         assertNull(mError);
345         mResponseStep = ResponseStep.ON_TRAILERS;
346         mResponseInfo = info;
347         mTrailers = trailers;
348         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
349             return;
350         }
351     }
352 
353     @Override
onSucceeded(BidirectionalStream stream, UrlResponseInfo info)354     public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
355         checkOnValidThread();
356         assertTrue(stream.isDone());
357         assertTrue(
358                 mResponseStep == ResponseStep.ON_RESPONSE_STARTED
359                         || mResponseStep == ResponseStep.ON_READ_COMPLETED
360                         || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
361                         || mResponseStep == ResponseStep.ON_TRAILERS);
362         assertFalse(mOnErrorCalled);
363         assertFalse(mOnCanceledCalled);
364         assertNull(mError);
365         assertEquals(0, mWriteBuffers.size());
366         assertEquals(0, mWriteBuffersToBeAcked.size());
367 
368         mResponseStep = ResponseStep.ON_SUCCEEDED;
369         mResponseInfo = info;
370         openDone();
371         maybeThrowCancelOrPause(stream, mReadStepBlock);
372     }
373 
374     @Override
onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error)375     public void onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error) {
376         checkOnValidThread();
377         assertTrue(stream.isDone());
378         // Shouldn't happen after success.
379         assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED);
380         // Should happen at most once for a single stream.
381         assertFalse(mOnErrorCalled);
382         assertFalse(mOnCanceledCalled);
383         assertNull(mError);
384         mResponseStep = ResponseStep.ON_FAILED;
385         mResponseInfo = info;
386 
387         mOnErrorCalled = true;
388         mError = error;
389         openDone();
390         maybeThrowCancelOrPause(stream, mReadStepBlock);
391     }
392 
393     @Override
onCanceled(BidirectionalStream stream, UrlResponseInfo info)394     public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
395         checkOnValidThread();
396         assertTrue(stream.isDone());
397         // Should happen at most once for a single stream.
398         assertFalse(mOnCanceledCalled);
399         assertFalse(mOnErrorCalled);
400         assertNull(mError);
401         mResponseStep = ResponseStep.ON_CANCELED;
402         mResponseInfo = info;
403 
404         mOnCanceledCalled = true;
405         openDone();
406         maybeThrowCancelOrPause(stream, mReadStepBlock);
407     }
408 
startNextRead(BidirectionalStream stream)409     public void startNextRead(BidirectionalStream stream) {
410         startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
411     }
412 
startNextRead(BidirectionalStream stream, ByteBuffer buffer)413     public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
414         mBufferPositionBeforeRead = buffer.position();
415         stream.read(buffer);
416     }
417 
startNextWrite(BidirectionalStream stream)418     public void startNextWrite(BidirectionalStream stream) {
419         if (!mWriteBuffers.isEmpty()) {
420             Iterator<WriteBuffer> iterator = mWriteBuffers.iterator();
421             while (iterator.hasNext()) {
422                 WriteBuffer b = iterator.next();
423                 stream.write(b.mBuffer, !iterator.hasNext());
424                 iterator.remove();
425                 if (b.mFlush) {
426                     stream.flush();
427                     break;
428                 }
429             }
430         }
431     }
432 
isDone()433     public boolean isDone() {
434         // It's not mentioned by the Android docs, but block(0) seems to block
435         // indefinitely, so have to block for one millisecond to get state
436         // without blocking.
437         return mDone.block(1);
438     }
439 
440     /** Returns the number of pending Writes. */
numPendingWrites()441     public int numPendingWrites() {
442         return mWriteBuffers.size();
443     }
444 
openDone()445     protected void openDone() {
446         mDone.open();
447     }
448 
449     /** Returns {@code false} if the callback should continue to advance the stream. */
maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)450     private boolean maybeThrowCancelOrPause(
451             final BidirectionalStream stream, ConditionVariable stepBlock) {
452         if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) {
453             if (!mAutoAdvance) {
454                 stepBlock.open();
455                 return true;
456             }
457             return false;
458         }
459 
460         if (mFailureType == FailureType.THROW_SYNC) {
461             throw new IllegalStateException("Callback Exception.");
462         }
463         Runnable task =
464                 new Runnable() {
465                     @Override
466                     public void run() {
467                         stream.cancel();
468                     }
469                 };
470         if (mFailureType == FailureType.CANCEL_ASYNC
471                 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
472             getExecutor().execute(task);
473         } else {
474             task.run();
475         }
476         return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
477     }
478 
479     /** Checks whether callback methods are invoked on the correct thread. */
checkOnValidThread()480     private void checkOnValidThread() {
481         if (!mUseDirectExecutor) {
482             assertEquals(mExecutorThread, Thread.currentThread());
483         }
484     }
485 }
486