• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net;
6 
7 import static com.google.common.truth.Truth.assertThat;
8 
9 import android.os.ConditionVariable;
10 
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Iterator;
14 import java.util.concurrent.Executor;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.ThreadFactory;
18 
19 /**
20  * Callback that tracks information from different callbacks and and has a
21  * method to block thread until the stream completes on another thread.
22  * Allows to cancel, block stream or throw an exception from an arbitrary step.
23  */
24 public class TestBidirectionalStreamCallback extends BidirectionalStream.Callback {
25     private UrlResponseInfo mResponseInfo;
26     public CronetException mError;
27 
28     public ResponseStep mResponseStep = ResponseStep.NOTHING;
29 
30     public boolean mOnErrorCalled;
31     public boolean mOnCanceledCalled;
32 
33     public int mHttpResponseDataLength;
34     public String mResponseAsString = "";
35 
36     public UrlResponseInfo.HeaderBlock mTrailers;
37 
38     private static final int READ_BUFFER_SIZE = 32 * 1024;
39 
40     // When false, the consumer is responsible for all calls into the stream
41     // that advance it.
42     private boolean mAutoAdvance = true;
43 
44     // The executor thread will block on this after reaching a terminal method.
45     // Terminal methods are (onSucceeded, onFailed or onCancelled)
46     private ConditionVariable mBlockOnTerminalState = new ConditionVariable(true);
47 
48     // Conditionally fail on certain steps.
49     private FailureType mFailureType = FailureType.NONE;
50     private ResponseStep mFailureStep = ResponseStep.NOTHING;
51 
52     // Signals when the stream is done either successfully or not.
53     private final ConditionVariable mDone = new ConditionVariable();
54 
55     // Signaled on each step when mAutoAdvance is false.
56     private final ConditionVariable mReadStepBlock = new ConditionVariable();
57     private final ConditionVariable mWriteStepBlock = new ConditionVariable();
58 
59     // Executor Service for Cronet callbacks.
60     private final ExecutorService mExecutorService =
61             Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
62     private Thread mExecutorThread;
63 
64     // position() of ByteBuffer prior to read() call.
65     private int mBufferPositionBeforeRead;
66 
67     // Data to write.
68     private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>();
69 
70     // Buffers that we yet to receive the corresponding onWriteCompleted callback.
71     private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>();
72 
73     // Whether to use a direct executor.
74     private final boolean mUseDirectExecutor;
75     private final DirectExecutor mDirectExecutor;
76 
77     private class ExecutorThreadFactory implements ThreadFactory {
78         @Override
newThread(Runnable r)79         public Thread newThread(Runnable r) {
80             mExecutorThread = new Thread(r);
81             return mExecutorThread;
82         }
83     }
84 
85     private static class WriteBuffer {
86         final ByteBuffer mBuffer;
87         final boolean mFlush;
88 
WriteBuffer(ByteBuffer buffer, boolean flush)89         public WriteBuffer(ByteBuffer buffer, boolean flush) {
90             mBuffer = buffer;
91             mFlush = flush;
92         }
93     }
94 
95     private static class DirectExecutor implements Executor {
96         @Override
execute(Runnable task)97         public void execute(Runnable task) {
98             task.run();
99         }
100     }
101 
102     public enum ResponseStep {
103         NOTHING,
104         ON_STREAM_READY,
105         ON_RESPONSE_STARTED,
106         ON_READ_COMPLETED,
107         ON_WRITE_COMPLETED,
108         ON_TRAILERS,
109         ON_CANCELED,
110         ON_FAILED,
111         ON_SUCCEEDED,
112     }
113 
114     public enum FailureType {
115         NONE,
116         CANCEL_SYNC,
117         CANCEL_ASYNC,
118         // Same as above, but continues to advance the stream after posting
119         // the cancellation task.
120         CANCEL_ASYNC_WITHOUT_PAUSE,
121         THROW_SYNC
122     }
123 
TestBidirectionalStreamCallback()124     public TestBidirectionalStreamCallback() {
125         mUseDirectExecutor = false;
126         mDirectExecutor = null;
127     }
128 
TestBidirectionalStreamCallback(boolean useDirectExecutor)129     public TestBidirectionalStreamCallback(boolean useDirectExecutor) {
130         mUseDirectExecutor = useDirectExecutor;
131         mDirectExecutor = new DirectExecutor();
132     }
133 
134     /**
135      * This blocks the callback executor thread once it has reached a final state callback.
136      * In order to continue execution, this method must be called again and providing {@code false}
137      * to continue execution.
138      * @param blockOnTerminalState the state to set for the executor thread
139      */
setBlockOnTerminalState(boolean blockOnTerminalState)140     public void setBlockOnTerminalState(boolean blockOnTerminalState) {
141         if (blockOnTerminalState) {
142             mBlockOnTerminalState.close();
143         } else {
144             mBlockOnTerminalState.open();
145         }
146     }
147 
setAutoAdvance(boolean autoAdvance)148     public void setAutoAdvance(boolean autoAdvance) {
149         mAutoAdvance = autoAdvance;
150     }
151 
setFailure(FailureType failureType, ResponseStep failureStep)152     public void setFailure(FailureType failureType, ResponseStep failureStep) {
153         mFailureStep = failureStep;
154         mFailureType = failureType;
155     }
156 
blockForDone()157     public void blockForDone() {
158         mDone.block();
159     }
160 
waitForNextReadStep()161     public void waitForNextReadStep() {
162         mReadStepBlock.block();
163         mReadStepBlock.close();
164     }
165 
waitForNextWriteStep()166     public void waitForNextWriteStep() {
167         mWriteStepBlock.block();
168         mWriteStepBlock.close();
169     }
170 
getExecutor()171     public Executor getExecutor() {
172         if (mUseDirectExecutor) {
173             return mDirectExecutor;
174         }
175         return mExecutorService;
176     }
177 
shutdownExecutor()178     public void shutdownExecutor() {
179         if (mUseDirectExecutor) {
180             throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown");
181         }
182         mExecutorService.shutdown();
183     }
184 
addWriteData(byte[] data)185     public void addWriteData(byte[] data) {
186         addWriteData(data, true);
187     }
188 
addWriteData(byte[] data, boolean flush)189     public void addWriteData(byte[] data, boolean flush) {
190         ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
191         writeBuffer.put(data);
192         writeBuffer.flip();
193         mWriteBuffers.add(new WriteBuffer(writeBuffer, flush));
194         mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush));
195     }
196 
197     @Override
onStreamReady(BidirectionalStream stream)198     public void onStreamReady(BidirectionalStream stream) {
199         checkOnValidThread();
200         assertThat(stream.isDone()).isFalse();
201         assertThat(mResponseStep).isEqualTo(ResponseStep.NOTHING);
202         assertThat(mError).isNull();
203         mResponseStep = ResponseStep.ON_STREAM_READY;
204         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
205             return;
206         }
207         startNextWrite(stream);
208     }
209 
210     @Override
onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)211     public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
212         checkOnValidThread();
213         assertThat(stream.isDone()).isFalse();
214         assertThat(mResponseStep)
215                 .isAnyOf(
216                         ResponseStep.NOTHING,
217                         ResponseStep.ON_STREAM_READY,
218                         ResponseStep.ON_WRITE_COMPLETED);
219         assertThat(mError).isNull();
220 
221         mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
222         mResponseInfo = info;
223         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
224             return;
225         }
226         startNextRead(stream);
227     }
228 
229     @Override
onReadCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)230     public void onReadCompleted(
231             BidirectionalStream stream,
232             UrlResponseInfo info,
233             ByteBuffer byteBuffer,
234             boolean endOfStream) {
235         checkOnValidThread();
236         assertThat(stream.isDone()).isFalse();
237         assertThat(mResponseStep)
238                 .isAnyOf(
239                         ResponseStep.ON_RESPONSE_STARTED,
240                         ResponseStep.ON_READ_COMPLETED,
241                         ResponseStep.ON_WRITE_COMPLETED,
242                         ResponseStep.ON_TRAILERS);
243         assertThat(mError).isNull();
244 
245         mResponseStep = ResponseStep.ON_READ_COMPLETED;
246         mResponseInfo = info;
247 
248         final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead;
249         mHttpResponseDataLength += bytesRead;
250         final byte[] lastDataReceivedAsBytes = new byte[bytesRead];
251         // Rewind byteBuffer.position() to pre-read() position.
252         byteBuffer.position(mBufferPositionBeforeRead);
253         // This restores byteBuffer.position() to its value on entrance to
254         // this function.
255         byteBuffer.get(lastDataReceivedAsBytes);
256 
257         mResponseAsString += new String(lastDataReceivedAsBytes);
258 
259         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
260             return;
261         }
262         // Do not read if EOF has been reached.
263         if (!endOfStream) {
264             startNextRead(stream);
265         }
266     }
267 
268     @Override
onWriteCompleted( BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)269     public void onWriteCompleted(
270             BidirectionalStream stream,
271             UrlResponseInfo info,
272             ByteBuffer buffer,
273             boolean endOfStream) {
274         checkOnValidThread();
275         assertThat(stream.isDone()).isFalse();
276         assertThat(mError).isNull();
277         mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
278         mResponseInfo = info;
279         if (!mWriteBuffersToBeAcked.isEmpty()) {
280             assertThat(mWriteBuffersToBeAcked.get(0).mBuffer).isEqualTo(buffer);
281             mWriteBuffersToBeAcked.remove(0);
282         }
283         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
284             return;
285         }
286         startNextWrite(stream);
287     }
288 
289     @Override
onResponseTrailersReceived( BidirectionalStream stream, UrlResponseInfo info, UrlResponseInfo.HeaderBlock trailers)290     public void onResponseTrailersReceived(
291             BidirectionalStream stream,
292             UrlResponseInfo info,
293             UrlResponseInfo.HeaderBlock trailers) {
294         checkOnValidThread();
295         assertThat(stream.isDone()).isFalse();
296         assertThat(mError).isNull();
297         mResponseStep = ResponseStep.ON_TRAILERS;
298         mResponseInfo = info;
299         mTrailers = trailers;
300         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
301             return;
302         }
303     }
304 
305     @Override
onSucceeded(BidirectionalStream stream, UrlResponseInfo info)306     public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
307         checkOnValidThread();
308         assertThat(stream.isDone()).isTrue();
309         assertThat(mResponseStep)
310                 .isAnyOf(
311                         ResponseStep.ON_RESPONSE_STARTED,
312                         ResponseStep.ON_READ_COMPLETED,
313                         ResponseStep.ON_WRITE_COMPLETED,
314                         ResponseStep.ON_TRAILERS);
315         assertThat(mOnErrorCalled).isFalse();
316         assertThat(mOnCanceledCalled).isFalse();
317         assertThat(mError).isNull();
318         assertThat(mWriteBuffers).isEmpty();
319         assertThat(mWriteBuffersToBeAcked).isEmpty();
320 
321         mResponseStep = ResponseStep.ON_SUCCEEDED;
322         mResponseInfo = info;
323         openDone();
324         mBlockOnTerminalState.block();
325         maybeThrowCancelOrPause(stream, mReadStepBlock);
326     }
327 
328     @Override
onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error)329     public void onFailed(BidirectionalStream stream, UrlResponseInfo info, CronetException error) {
330         checkOnValidThread();
331         assertThat(stream.isDone()).isTrue();
332         // Shouldn't happen after success.
333         assertThat(mResponseStep).isNotEqualTo(ResponseStep.ON_SUCCEEDED);
334         // Should happen at most once for a single stream.
335         assertThat(mOnErrorCalled).isFalse();
336         assertThat(mOnCanceledCalled).isFalse();
337         assertThat(mError).isNull();
338         mResponseStep = ResponseStep.ON_FAILED;
339         mResponseInfo = info;
340 
341         mOnErrorCalled = true;
342         mError = error;
343         openDone();
344         mBlockOnTerminalState.block();
345         maybeThrowCancelOrPause(stream, mReadStepBlock);
346     }
347 
348     @Override
onCanceled(BidirectionalStream stream, UrlResponseInfo info)349     public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
350         checkOnValidThread();
351         assertThat(stream.isDone()).isTrue();
352         // Should happen at most once for a single stream.
353         assertThat(mOnCanceledCalled).isFalse();
354         assertThat(mOnErrorCalled).isFalse();
355         assertThat(mError).isNull();
356         mResponseStep = ResponseStep.ON_CANCELED;
357         mResponseInfo = info;
358 
359         mOnCanceledCalled = true;
360         openDone();
361         mBlockOnTerminalState.block();
362         maybeThrowCancelOrPause(stream, mReadStepBlock);
363     }
364 
startNextRead(BidirectionalStream stream)365     public void startNextRead(BidirectionalStream stream) {
366         startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
367     }
368 
startNextRead(BidirectionalStream stream, ByteBuffer buffer)369     public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
370         mBufferPositionBeforeRead = buffer.position();
371         stream.read(buffer);
372     }
373 
startNextWrite(BidirectionalStream stream)374     public void startNextWrite(BidirectionalStream stream) {
375         if (!mWriteBuffers.isEmpty()) {
376             Iterator<WriteBuffer> iterator = mWriteBuffers.iterator();
377             while (iterator.hasNext()) {
378                 WriteBuffer b = iterator.next();
379                 stream.write(b.mBuffer, !iterator.hasNext());
380                 iterator.remove();
381                 if (b.mFlush) {
382                     stream.flush();
383                     break;
384                 }
385             }
386         }
387     }
388 
isDone()389     public boolean isDone() {
390         // It's not mentioned by the Android docs, but block(0) seems to block
391         // indefinitely, so have to block for one millisecond to get state
392         // without blocking.
393         return mDone.block(1);
394     }
395 
396     /** Returns the number of pending Writes. */
numPendingWrites()397     public int numPendingWrites() {
398         return mWriteBuffers.size();
399     }
400 
401     /**
402      * Asserts that there is no callback error before trying to access responseInfo. Only use this
403      * when you expect {@code mError} to be null.
404      * @return {@link UrlResponseInfo}
405      */
getResponseInfoWithChecks()406     public UrlResponseInfo getResponseInfoWithChecks() {
407         assertThat(mError).isNull();
408         assertThat(mOnErrorCalled).isFalse();
409         assertThat(mResponseInfo).isNotNull();
410         return mResponseInfo;
411     }
412 
413     /**
414      * Simply returns {@code mResponseInfo} with no nullability or error checks.
415      * @return {@link UrlResponseInfo}
416      */
getResponseInfo()417     public UrlResponseInfo getResponseInfo() {
418         return mResponseInfo;
419     }
420 
openDone()421     protected void openDone() {
422         mDone.open();
423     }
424 
425     /**
426      * Returns {@code false} if the callback should continue to advance the
427      * stream.
428      */
maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)429     private boolean maybeThrowCancelOrPause(
430             final BidirectionalStream stream, ConditionVariable stepBlock) {
431         if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) {
432             if (!mAutoAdvance) {
433                 stepBlock.open();
434                 return true;
435             }
436             return false;
437         }
438 
439         if (mFailureType == FailureType.THROW_SYNC) {
440             throw new IllegalStateException("Callback Exception.");
441         }
442         Runnable task =
443                 new Runnable() {
444                     @Override
445                     public void run() {
446                         stream.cancel();
447                     }
448                 };
449         if (mFailureType == FailureType.CANCEL_ASYNC
450                 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
451             getExecutor().execute(task);
452         } else {
453             task.run();
454         }
455         return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
456     }
457 
458     /** Checks whether callback methods are invoked on the correct thread. */
checkOnValidThread()459     private void checkOnValidThread() {
460         if (!mUseDirectExecutor) {
461             assertThat(Thread.currentThread()).isEqualTo(mExecutorThread);
462         }
463     }
464 }
465