• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net;
6 
7 import android.net.http.BidirectionalStream;
8 import android.net.http.HttpException;
9 import android.net.http.HeaderBlock;
10 import android.net.http.UrlResponseInfo;
11 import android.os.ConditionVariable;
12 
13 import static junit.framework.Assert.assertEquals;
14 import static junit.framework.Assert.assertFalse;
15 import static junit.framework.Assert.assertNull;
16 import static junit.framework.Assert.assertTrue;
17 
18 import java.nio.ByteBuffer;
19 import java.util.ArrayList;
20 import java.util.Iterator;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ThreadFactory;
25 
26 /**
27  * Callback that tracks information from different callbacks and and has a
28  * method to block thread until the stream completes on another thread.
29  * Allows to cancel, block stream or throw an exception from an arbitrary step.
30  */
31 public class TestBidirectionalStreamCallback implements BidirectionalStream.Callback {
32     public UrlResponseInfo mResponseInfo;
33     public HttpException mError;
34 
35     public ResponseStep mResponseStep = ResponseStep.NOTHING;
36 
37     public boolean mOnErrorCalled;
38     public boolean mOnCanceledCalled;
39 
40     public int mHttpResponseDataLength;
41     public String mResponseAsString = "";
42 
43     public HeaderBlock mTrailers;
44 
45     private static final int READ_BUFFER_SIZE = 32 * 1024;
46 
47     // When false, the consumer is responsible for all calls into the stream
48     // that advance it.
49     private boolean mAutoAdvance = true;
50 
51     // Conditionally fail on certain steps.
52     private FailureType mFailureType = FailureType.NONE;
53     private ResponseStep mFailureStep = ResponseStep.NOTHING;
54 
55     // Signals when the stream is done either successfully or not.
56     private final ConditionVariable mDone = new ConditionVariable();
57 
58     // Signaled on each step when mAutoAdvance is false.
59     private final ConditionVariable mReadStepBlock = new ConditionVariable();
60     private final ConditionVariable mWriteStepBlock = new ConditionVariable();
61 
62     // Executor Service for Cronet callbacks.
63     private final ExecutorService mExecutorService =
64             Executors.newSingleThreadExecutor(new ExecutorThreadFactory());
65     private Thread mExecutorThread;
66 
67     // position() of ByteBuffer prior to read() call.
68     private int mBufferPositionBeforeRead;
69 
70     // Data to write.
71     private final ArrayList<WriteBuffer> mWriteBuffers = new ArrayList<WriteBuffer>();
72 
73     // Buffers that we yet to receive the corresponding onWriteCompleted callback.
74     private final ArrayList<WriteBuffer> mWriteBuffersToBeAcked = new ArrayList<WriteBuffer>();
75 
76     // Whether to use a direct executor.
77     private final boolean mUseDirectExecutor;
78     private final DirectExecutor mDirectExecutor;
79 
80     private class ExecutorThreadFactory implements ThreadFactory {
81         @Override
newThread(Runnable r)82         public Thread newThread(Runnable r) {
83             mExecutorThread = new Thread(r);
84             return mExecutorThread;
85         }
86     }
87 
88     private static class WriteBuffer {
89         final ByteBuffer mBuffer;
90         final boolean mFlush;
WriteBuffer(ByteBuffer buffer, boolean flush)91         public WriteBuffer(ByteBuffer buffer, boolean flush) {
92             mBuffer = buffer;
93             mFlush = flush;
94         }
95     }
96 
97     private static class DirectExecutor implements Executor {
98         @Override
execute(Runnable task)99         public void execute(Runnable task) {
100             task.run();
101         }
102     }
103 
104     public enum ResponseStep {
105         NOTHING,
106         ON_STREAM_READY,
107         ON_RESPONSE_STARTED,
108         ON_READ_COMPLETED,
109         ON_WRITE_COMPLETED,
110         ON_TRAILERS,
111         ON_CANCELED,
112         ON_FAILED,
113         ON_SUCCEEDED,
114     }
115 
116     public enum FailureType {
117         NONE,
118         CANCEL_SYNC,
119         CANCEL_ASYNC,
120         // Same as above, but continues to advance the stream after posting
121         // the cancellation task.
122         CANCEL_ASYNC_WITHOUT_PAUSE,
123         THROW_SYNC
124     }
125 
TestBidirectionalStreamCallback()126     public TestBidirectionalStreamCallback() {
127         mUseDirectExecutor = false;
128         mDirectExecutor = null;
129     }
130 
TestBidirectionalStreamCallback(boolean useDirectExecutor)131     public TestBidirectionalStreamCallback(boolean useDirectExecutor) {
132         mUseDirectExecutor = useDirectExecutor;
133         mDirectExecutor = new DirectExecutor();
134     }
135 
setAutoAdvance(boolean autoAdvance)136     public void setAutoAdvance(boolean autoAdvance) {
137         mAutoAdvance = autoAdvance;
138     }
139 
setFailure(FailureType failureType, ResponseStep failureStep)140     public void setFailure(FailureType failureType, ResponseStep failureStep) {
141         mFailureStep = failureStep;
142         mFailureType = failureType;
143     }
144 
blockForDone()145     public void blockForDone() {
146         mDone.block();
147     }
148 
waitForNextReadStep()149     public void waitForNextReadStep() {
150         mReadStepBlock.block();
151         mReadStepBlock.close();
152     }
153 
waitForNextWriteStep()154     public void waitForNextWriteStep() {
155         mWriteStepBlock.block();
156         mWriteStepBlock.close();
157     }
158 
getExecutor()159     public Executor getExecutor() {
160         if (mUseDirectExecutor) {
161             return mDirectExecutor;
162         }
163         return mExecutorService;
164     }
165 
shutdownExecutor()166     public void shutdownExecutor() {
167         if (mUseDirectExecutor) {
168             throw new UnsupportedOperationException("DirectExecutor doesn't support shutdown");
169         }
170         mExecutorService.shutdown();
171     }
172 
addWriteData(byte[] data)173     public void addWriteData(byte[] data) {
174         addWriteData(data, true);
175     }
176 
addWriteData(byte[] data, boolean flush)177     public void addWriteData(byte[] data, boolean flush) {
178         ByteBuffer writeBuffer = ByteBuffer.allocateDirect(data.length);
179         writeBuffer.put(data);
180         writeBuffer.flip();
181         mWriteBuffers.add(new WriteBuffer(writeBuffer, flush));
182         mWriteBuffersToBeAcked.add(new WriteBuffer(writeBuffer, flush));
183     }
184 
185     @Override
onStreamReady(BidirectionalStream stream)186     public void onStreamReady(BidirectionalStream stream) {
187         checkOnValidThread();
188         assertFalse(stream.isDone());
189         assertEquals(ResponseStep.NOTHING, mResponseStep);
190         assertNull(mError);
191         mResponseStep = ResponseStep.ON_STREAM_READY;
192         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
193             return;
194         }
195         startNextWrite(stream);
196     }
197 
198     @Override
onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info)199     public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
200         checkOnValidThread();
201         assertFalse(stream.isDone());
202         assertTrue(mResponseStep == ResponseStep.NOTHING
203                 || mResponseStep == ResponseStep.ON_STREAM_READY
204                 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED);
205         assertNull(mError);
206 
207         mResponseStep = ResponseStep.ON_RESPONSE_STARTED;
208         mResponseInfo = info;
209         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
210             return;
211         }
212         startNextRead(stream);
213     }
214 
215     @Override
onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer byteBuffer, boolean endOfStream)216     public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info,
217             ByteBuffer byteBuffer, boolean endOfStream) {
218         checkOnValidThread();
219         assertFalse(stream.isDone());
220         assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
221                 || mResponseStep == ResponseStep.ON_READ_COMPLETED
222                 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
223                 || mResponseStep == ResponseStep.ON_TRAILERS);
224         assertNull(mError);
225 
226         mResponseStep = ResponseStep.ON_READ_COMPLETED;
227         mResponseInfo = info;
228 
229         final int bytesRead = byteBuffer.position() - mBufferPositionBeforeRead;
230         mHttpResponseDataLength += bytesRead;
231         final byte[] lastDataReceivedAsBytes = new byte[bytesRead];
232         // Rewind byteBuffer.position() to pre-read() position.
233         byteBuffer.position(mBufferPositionBeforeRead);
234         // This restores byteBuffer.position() to its value on entrance to
235         // this function.
236         byteBuffer.get(lastDataReceivedAsBytes);
237 
238         mResponseAsString += new String(lastDataReceivedAsBytes);
239 
240         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
241             return;
242         }
243         // Do not read if EOF has been reached.
244         if (!endOfStream) {
245             startNextRead(stream);
246         }
247     }
248 
249     @Override
onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, ByteBuffer buffer, boolean endOfStream)250     public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info,
251             ByteBuffer buffer, boolean endOfStream) {
252         checkOnValidThread();
253         assertFalse(stream.isDone());
254         assertNull(mError);
255         mResponseStep = ResponseStep.ON_WRITE_COMPLETED;
256         mResponseInfo = info;
257         if (!mWriteBuffersToBeAcked.isEmpty()) {
258             assertEquals(buffer, mWriteBuffersToBeAcked.get(0).mBuffer);
259             mWriteBuffersToBeAcked.remove(0);
260         }
261         if (maybeThrowCancelOrPause(stream, mWriteStepBlock)) {
262             return;
263         }
264         startNextWrite(stream);
265     }
266 
267     @Override
onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, HeaderBlock trailers)268     public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info,
269             HeaderBlock trailers) {
270         checkOnValidThread();
271         assertFalse(stream.isDone());
272         assertNull(mError);
273         mResponseStep = ResponseStep.ON_TRAILERS;
274         mResponseInfo = info;
275         mTrailers = trailers;
276         if (maybeThrowCancelOrPause(stream, mReadStepBlock)) {
277             return;
278         }
279     }
280 
281     @Override
onSucceeded(BidirectionalStream stream, UrlResponseInfo info)282     public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
283         checkOnValidThread();
284         assertTrue(stream.isDone());
285         assertTrue(mResponseStep == ResponseStep.ON_RESPONSE_STARTED
286                 || mResponseStep == ResponseStep.ON_READ_COMPLETED
287                 || mResponseStep == ResponseStep.ON_WRITE_COMPLETED
288                 || mResponseStep == ResponseStep.ON_TRAILERS);
289         assertFalse(mOnErrorCalled);
290         assertFalse(mOnCanceledCalled);
291         assertNull(mError);
292         assertEquals(0, mWriteBuffers.size());
293         assertEquals(0, mWriteBuffersToBeAcked.size());
294 
295         mResponseStep = ResponseStep.ON_SUCCEEDED;
296         mResponseInfo = info;
297         openDone();
298         maybeThrowCancelOrPause(stream, mReadStepBlock);
299     }
300 
301     @Override
onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error)302     public void onFailed(BidirectionalStream stream, UrlResponseInfo info, HttpException error) {
303         checkOnValidThread();
304         assertTrue(stream.isDone());
305         // Shouldn't happen after success.
306         assertTrue(mResponseStep != ResponseStep.ON_SUCCEEDED);
307         // Should happen at most once for a single stream.
308         assertFalse(mOnErrorCalled);
309         assertFalse(mOnCanceledCalled);
310         assertNull(mError);
311         mResponseStep = ResponseStep.ON_FAILED;
312         mResponseInfo = info;
313 
314         mOnErrorCalled = true;
315         mError = error;
316         openDone();
317         maybeThrowCancelOrPause(stream, mReadStepBlock);
318     }
319 
320     @Override
onCanceled(BidirectionalStream stream, UrlResponseInfo info)321     public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
322         checkOnValidThread();
323         assertTrue(stream.isDone());
324         // Should happen at most once for a single stream.
325         assertFalse(mOnCanceledCalled);
326         assertFalse(mOnErrorCalled);
327         assertNull(mError);
328         mResponseStep = ResponseStep.ON_CANCELED;
329         mResponseInfo = info;
330 
331         mOnCanceledCalled = true;
332         openDone();
333         maybeThrowCancelOrPause(stream, mReadStepBlock);
334     }
335 
startNextRead(BidirectionalStream stream)336     public void startNextRead(BidirectionalStream stream) {
337         startNextRead(stream, ByteBuffer.allocateDirect(READ_BUFFER_SIZE));
338     }
339 
startNextRead(BidirectionalStream stream, ByteBuffer buffer)340     public void startNextRead(BidirectionalStream stream, ByteBuffer buffer) {
341         mBufferPositionBeforeRead = buffer.position();
342         stream.read(buffer);
343     }
344 
startNextWrite(BidirectionalStream stream)345     public void startNextWrite(BidirectionalStream stream) {
346         if (!mWriteBuffers.isEmpty()) {
347             Iterator<WriteBuffer> iterator = mWriteBuffers.iterator();
348             while (iterator.hasNext()) {
349                 WriteBuffer b = iterator.next();
350                 stream.write(b.mBuffer, !iterator.hasNext());
351                 iterator.remove();
352                 if (b.mFlush) {
353                     stream.flush();
354                     break;
355                 }
356             }
357         }
358     }
359 
isDone()360     public boolean isDone() {
361         // It's not mentioned by the Android docs, but block(0) seems to block
362         // indefinitely, so have to block for one millisecond to get state
363         // without blocking.
364         return mDone.block(1);
365     }
366 
367     /**
368      * Returns the number of pending Writes.
369      */
numPendingWrites()370     public int numPendingWrites() {
371         return mWriteBuffers.size();
372     }
373 
openDone()374     protected void openDone() {
375         mDone.open();
376     }
377 
378     /**
379      * Returns {@code false} if the callback should continue to advance the
380      * stream.
381      */
maybeThrowCancelOrPause( final BidirectionalStream stream, ConditionVariable stepBlock)382     private boolean maybeThrowCancelOrPause(
383             final BidirectionalStream stream, ConditionVariable stepBlock) {
384         if (mResponseStep != mFailureStep || mFailureType == FailureType.NONE) {
385             if (!mAutoAdvance) {
386                 stepBlock.open();
387                 return true;
388             }
389             return false;
390         }
391 
392         if (mFailureType == FailureType.THROW_SYNC) {
393             throw new IllegalStateException("Callback Exception.");
394         }
395         Runnable task = new Runnable() {
396             @Override
397             public void run() {
398                 stream.cancel();
399             }
400         };
401         if (mFailureType == FailureType.CANCEL_ASYNC
402                 || mFailureType == FailureType.CANCEL_ASYNC_WITHOUT_PAUSE) {
403             getExecutor().execute(task);
404         } else {
405             task.run();
406         }
407         return mFailureType != FailureType.CANCEL_ASYNC_WITHOUT_PAUSE;
408     }
409 
410     /**
411      * Checks whether callback methods are invoked on the correct thread.
412      */
checkOnValidThread()413     private void checkOnValidThread() {
414         if (!mUseDirectExecutor) {
415             assertEquals(mExecutorThread, Thread.currentThread());
416         }
417     }
418 }
419