• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net.impl;
6 
7 import android.annotation.SuppressLint;
8 
9 import androidx.annotation.IntDef;
10 import androidx.annotation.VisibleForTesting;
11 
12 import org.jni_zero.CalledByNative;
13 import org.jni_zero.JNINamespace;
14 import org.jni_zero.NativeClassQualifiedName;
15 import org.jni_zero.NativeMethods;
16 
17 import org.chromium.base.Log;
18 import org.chromium.net.UploadDataProvider;
19 import org.chromium.net.UploadDataSink;
20 
21 import java.io.IOException;
22 import java.lang.annotation.Retention;
23 import java.lang.annotation.RetentionPolicy;
24 import java.nio.ByteBuffer;
25 import java.util.concurrent.Executor;
26 
27 import javax.annotation.concurrent.GuardedBy;
28 
29 /**
30  * CronetUploadDataStream handles communication between an upload body
31  * encapsulated in the embedder's {@link UploadDataSink} and a C++
32  * UploadDataStreamAdapter, which it owns. It's attached to a {@link
33  * CronetUrlRequest}'s during the construction of request's native C++ objects
34  * on the network thread, though it's created on one of the embedder's threads.
35  * It is called by the UploadDataStreamAdapter on the network thread, but calls
36  * into the UploadDataSink and the UploadDataStreamAdapter on the Executor
37  * passed into its constructor.
38  */
39 @JNINamespace("cronet")
40 @VisibleForTesting
41 public final class CronetUploadDataStream extends UploadDataSink {
42     private static final String TAG = CronetUploadDataStream.class.getSimpleName();
43     // These are never changed, once a request starts.
44     private final Executor mExecutor;
45     private final VersionSafeCallbacks.UploadDataProviderWrapper mDataProvider;
46     private final CronetUrlRequest mRequest;
47     private long mLength;
48     private long mRemainingLength;
49     private long mByteBufferLimit;
50 
51     // Reusable read task, to reduce redundant memory allocation.
52     private final Runnable mReadTask =
53             new Runnable() {
54                 @Override
55                 public void run() {
56                     synchronized (mLock) {
57                         if (mUploadDataStreamAdapter == 0) {
58                             return;
59                         }
60                         checkState(UserCallback.NOT_IN_CALLBACK);
61                         if (mByteBuffer == null) {
62                             throw new IllegalStateException(
63                                     "Unexpected readData call. Buffer is null");
64                         }
65                         mInWhichUserCallback = UserCallback.READ;
66                     }
67                     try {
68                         checkCallingThread();
69                         assert mByteBuffer.position() == 0;
70                         mDataProvider.read(CronetUploadDataStream.this, mByteBuffer);
71                     } catch (Exception exception) {
72                         onError(exception);
73                     }
74                 }
75             };
76 
77     // ByteBuffer created in the native code and passed to
78     // UploadDataProvider for reading. It is only valid from the
79     // call to mDataProvider.read until onError or onReadSucceeded.
80     private ByteBuffer mByteBuffer;
81 
82     // Lock that protects all subsequent variables. The adapter has to be
83     // protected to ensure safe shutdown, mReading and mRewinding are protected
84     // to robustly detect getting read/rewind results more often than expected.
85     private final Object mLock = new Object();
86 
87     // Native adapter object, owned by the CronetUploadDataStream. It's only
88     // deleted after the native UploadDataStream object is destroyed. All access
89     // to the adapter is synchronized, for safe usage and cleanup.
90     @GuardedBy("mLock")
91     private long mUploadDataStreamAdapter;
92 
93     @IntDef({
94         UserCallback.READ,
95         UserCallback.REWIND,
96         UserCallback.GET_LENGTH,
97         UserCallback.NOT_IN_CALLBACK
98     })
99     @Retention(RetentionPolicy.SOURCE)
100     private @interface UserCallback {
101         int READ = 0;
102         int REWIND = 1;
103         int GET_LENGTH = 2;
104         int NOT_IN_CALLBACK = 3;
105     }
106 
107     @GuardedBy("mLock")
108     private @UserCallback int mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
109 
110     @GuardedBy("mLock")
111     private boolean mDestroyAdapterPostponed;
112 
113     private Runnable mOnDestroyedCallbackForTesting;
114 
115     /**
116      * Constructs a CronetUploadDataStream.
117      * @param dataProvider the UploadDataProvider to read data from.
118      * @param executor the Executor to execute UploadDataProvider tasks.
119      */
CronetUploadDataStream( UploadDataProvider dataProvider, Executor executor, CronetUrlRequest request)120     public CronetUploadDataStream(
121             UploadDataProvider dataProvider, Executor executor, CronetUrlRequest request) {
122         mExecutor = executor;
123         mDataProvider = new VersionSafeCallbacks.UploadDataProviderWrapper(dataProvider);
124         mRequest = request;
125     }
126 
127     /**
128      * Called by native code to make the UploadDataProvider read data into
129      * {@code byteBuffer}.
130      */
131     @SuppressWarnings("unused")
132     @CalledByNative
readData(ByteBuffer byteBuffer)133     void readData(ByteBuffer byteBuffer) {
134         mByteBuffer = byteBuffer;
135         mByteBufferLimit = byteBuffer.limit();
136         postTaskToExecutor(mReadTask);
137     }
138 
139     // TODO(mmenke): Consider implementing a cancel method.
140     // currently wait for any pending read to complete.
141 
142     /** Called by native code to make the UploadDataProvider rewind upload data. */
143     @SuppressWarnings("unused")
144     @CalledByNative
rewind()145     void rewind() {
146         Runnable task =
147                 new Runnable() {
148                     @Override
149                     public void run() {
150                         synchronized (mLock) {
151                             if (mUploadDataStreamAdapter == 0) {
152                                 return;
153                             }
154                             checkState(UserCallback.NOT_IN_CALLBACK);
155                             mInWhichUserCallback = UserCallback.REWIND;
156                         }
157                         try {
158                             checkCallingThread();
159                             mDataProvider.rewind(CronetUploadDataStream.this);
160                         } catch (Exception exception) {
161                             onError(exception);
162                         }
163                     }
164                 };
165         postTaskToExecutor(task);
166     }
167 
checkCallingThread()168     private void checkCallingThread() {
169         mRequest.checkCallingThread();
170     }
171 
172     @GuardedBy("mLock")
checkState(@serCallback int mode)173     private void checkState(@UserCallback int mode) {
174         if (mInWhichUserCallback != mode) {
175             throw new IllegalStateException(
176                     "Expected " + mode + ", but was " + mInWhichUserCallback);
177         }
178     }
179 
180     /**
181      * Called when the native UploadDataStream is destroyed.  At this point,
182      * the native adapter needs to be destroyed, but only after any pending
183      * read operation completes, as the adapter owns the read buffer.
184      */
185     @SuppressWarnings("unused")
186     @CalledByNative
onUploadDataStreamDestroyed()187     void onUploadDataStreamDestroyed() {
188         destroyAdapter();
189     }
190 
191     /**
192      * Helper method called when an exception occurred. This method resets
193      * states and propagates the error to the request.
194      */
onError(Throwable exception)195     private void onError(Throwable exception) {
196         final boolean sendClose;
197         synchronized (mLock) {
198             if (mInWhichUserCallback == UserCallback.NOT_IN_CALLBACK) {
199                 throw new IllegalStateException(
200                         "There is no read or rewind or length check in progress.", exception);
201             }
202             sendClose = mInWhichUserCallback == UserCallback.GET_LENGTH;
203             mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
204             mByteBuffer = null;
205             destroyAdapterIfPostponed();
206         }
207         // Failure before length is obtained means that the request has failed before the
208         // adapter has been initialized. Close the UploadDataProvider. This is safe to call
209         // here since failure during getLength can only happen on the user's executor.
210         if (sendClose) {
211             try {
212                 mDataProvider.close();
213             } catch (Exception e) {
214                 Log.e(TAG, "Failure closing data provider", e);
215             }
216         }
217 
218         // Just fail the request - simpler to fail directly, and
219         // UploadDataStream only supports failing during initialization, not
220         // while reading. The request is smart enough to handle the case where
221         // it was already canceled by the embedder.
222         mRequest.onUploadException(exception);
223     }
224 
225     @Override
226     @SuppressLint("DefaultLocale")
onReadSucceeded(boolean lastChunk)227     public void onReadSucceeded(boolean lastChunk) {
228         synchronized (mLock) {
229             checkState(UserCallback.READ);
230             if (mByteBufferLimit != mByteBuffer.limit()) {
231                 throw new IllegalStateException("ByteBuffer limit changed");
232             }
233             if (lastChunk && mLength >= 0) {
234                 throw new IllegalArgumentException("Non-chunked upload can't have last chunk");
235             }
236             int bytesRead = mByteBuffer.position();
237             mRemainingLength -= bytesRead;
238             if (mRemainingLength < 0 && mLength >= 0) {
239                 throw new IllegalArgumentException(
240                         String.format(
241                                 "Read upload data length %d exceeds expected length %d",
242                                 mLength - mRemainingLength, mLength));
243             }
244             mByteBuffer.position(0);
245             mByteBuffer = null;
246             mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
247 
248             destroyAdapterIfPostponed();
249             // Request may been canceled already.
250             if (mUploadDataStreamAdapter == 0) {
251                 return;
252             }
253             CronetUploadDataStreamJni.get()
254                     .onReadSucceeded(
255                             mUploadDataStreamAdapter,
256                             CronetUploadDataStream.this,
257                             bytesRead,
258                             lastChunk);
259         }
260     }
261 
262     @Override
onReadError(Exception exception)263     public void onReadError(Exception exception) {
264         synchronized (mLock) {
265             checkState(UserCallback.READ);
266             onError(exception);
267         }
268     }
269 
270     @Override
onRewindSucceeded()271     public void onRewindSucceeded() {
272         synchronized (mLock) {
273             checkState(UserCallback.REWIND);
274             mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
275             mRemainingLength = mLength;
276             // Request may been canceled already.
277             if (mUploadDataStreamAdapter == 0) {
278                 return;
279             }
280             CronetUploadDataStreamJni.get()
281                     .onRewindSucceeded(mUploadDataStreamAdapter, CronetUploadDataStream.this);
282         }
283     }
284 
285     @Override
onRewindError(Exception exception)286     public void onRewindError(Exception exception) {
287         synchronized (mLock) {
288             checkState(UserCallback.REWIND);
289             onError(exception);
290         }
291     }
292 
293     /** Posts task to application Executor. */
postTaskToExecutor(Runnable task)294     void postTaskToExecutor(Runnable task) {
295         try {
296             mExecutor.execute(task);
297         } catch (Throwable e) {
298             // Just fail the request. The request is smart enough to handle the
299             // case where it was already canceled by the embedder.
300             mRequest.onUploadException(e);
301         }
302     }
303 
304     /**
305      * The adapter is owned by the CronetUploadDataStream, so it can be
306      * destroyed safely when there is no pending read; however, destruction is
307      * initiated by the destruction of the native UploadDataStream.
308      */
destroyAdapter()309     private void destroyAdapter() {
310         synchronized (mLock) {
311             if (mInWhichUserCallback == UserCallback.READ) {
312                 // Wait for the read to complete before destroy the adapter.
313                 mDestroyAdapterPostponed = true;
314                 return;
315             }
316             if (mUploadDataStreamAdapter == 0) {
317                 return;
318             }
319             CronetUploadDataStreamJni.get().destroy(mUploadDataStreamAdapter);
320             mUploadDataStreamAdapter = 0;
321             if (mOnDestroyedCallbackForTesting != null) {
322                 mOnDestroyedCallbackForTesting.run();
323             }
324         }
325         postTaskToExecutor(
326                 new Runnable() {
327                     @Override
328                     public void run() {
329                         try {
330                             checkCallingThread();
331                             mDataProvider.close();
332                         } catch (Exception e) {
333                             Log.e(TAG, "Exception thrown when closing", e);
334                         }
335                     }
336                 });
337     }
338 
339     /**
340      * Destroys the native adapter if the destruction is postponed due to a
341      * pending read, which has since completed. Caller needs to be on executor
342      * thread.
343      */
destroyAdapterIfPostponed()344     private void destroyAdapterIfPostponed() {
345         synchronized (mLock) {
346             if (mInWhichUserCallback == UserCallback.READ) {
347                 throw new IllegalStateException(
348                         "Method should not be called when read has not completed.");
349             }
350             if (mDestroyAdapterPostponed) {
351                 destroyAdapter();
352             }
353         }
354     }
355 
356     /**
357      * Initializes upload length by getting it from data provider. Submits to
358      * the user's executor thread to allow getLength() to block and/or report errors.
359      * If data provider throws an exception, then it is reported to the request.
360      * No native calls to urlRequest are allowed as this is done before request
361      * start, so native object may not exist.
362      */
initializeWithRequest()363     void initializeWithRequest() {
364         synchronized (mLock) {
365             mInWhichUserCallback = UserCallback.GET_LENGTH;
366         }
367         try {
368             mRequest.checkCallingThread();
369             mLength = mDataProvider.getLength();
370             mRemainingLength = mLength;
371         } catch (Throwable t) {
372             onError(t);
373         }
374         synchronized (mLock) {
375             mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
376         }
377     }
378 
379     /**
380      * Creates native objects and attaches them to the underlying request
381      * adapter object. Always called on executor thread.
382      */
attachNativeAdapterToRequest(final long requestAdapter)383     void attachNativeAdapterToRequest(final long requestAdapter) {
384         synchronized (mLock) {
385             mUploadDataStreamAdapter =
386                     CronetUploadDataStreamJni.get()
387                             .attachUploadDataToRequest(
388                                     CronetUploadDataStream.this, requestAdapter, mLength);
389         }
390     }
391 
392     /**
393      * Creates a native CronetUploadDataStreamAdapter and
394      * CronetUploadDataStream for testing.
395      * @return the address of the native CronetUploadDataStream object.
396      */
createUploadDataStreamForTesting()397     public long createUploadDataStreamForTesting() throws IOException {
398         synchronized (mLock) {
399             mUploadDataStreamAdapter =
400                     CronetUploadDataStreamJni.get()
401                             .createAdapterForTesting(CronetUploadDataStream.this);
402             mLength = mDataProvider.getLength();
403             mRemainingLength = mLength;
404             return CronetUploadDataStreamJni.get()
405                     .createUploadDataStreamForTesting(
406                             CronetUploadDataStream.this, mLength, mUploadDataStreamAdapter);
407         }
408     }
409 
410     @VisibleForTesting
setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting)411     public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) {
412         mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting;
413     }
414 
415     // Native methods are implemented in upload_data_stream_adapter.cc.
416     @NativeMethods
417     interface Natives {
attachUploadDataToRequest( CronetUploadDataStream caller, long urlRequestAdapter, long length)418         long attachUploadDataToRequest(
419                 CronetUploadDataStream caller, long urlRequestAdapter, long length);
420 
createAdapterForTesting(CronetUploadDataStream caller)421         long createAdapterForTesting(CronetUploadDataStream caller);
422 
createUploadDataStreamForTesting( CronetUploadDataStream caller, long length, long adapter)423         long createUploadDataStreamForTesting(
424                 CronetUploadDataStream caller, long length, long adapter);
425 
426         @NativeClassQualifiedName("CronetUploadDataStreamAdapter")
onReadSucceeded( long nativePtr, CronetUploadDataStream caller, int bytesRead, boolean finalChunk)427         void onReadSucceeded(
428                 long nativePtr, CronetUploadDataStream caller, int bytesRead, boolean finalChunk);
429 
430         @NativeClassQualifiedName("CronetUploadDataStreamAdapter")
onRewindSucceeded(long nativePtr, CronetUploadDataStream caller)431         void onRewindSucceeded(long nativePtr, CronetUploadDataStream caller);
432 
433         @NativeClassQualifiedName("CronetUploadDataStreamAdapter")
destroy(long nativePtr)434         void destroy(long nativePtr);
435     }
436 }
437