1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.impl; 6 7 import android.annotation.SuppressLint; 8 9 import androidx.annotation.IntDef; 10 import androidx.annotation.VisibleForTesting; 11 12 import org.jni_zero.CalledByNative; 13 import org.jni_zero.JNINamespace; 14 import org.jni_zero.NativeClassQualifiedName; 15 import org.jni_zero.NativeMethods; 16 17 import org.chromium.base.Log; 18 import org.chromium.net.UploadDataProvider; 19 import org.chromium.net.UploadDataSink; 20 21 import java.io.IOException; 22 import java.lang.annotation.Retention; 23 import java.lang.annotation.RetentionPolicy; 24 import java.nio.ByteBuffer; 25 import java.util.concurrent.Executor; 26 27 import javax.annotation.concurrent.GuardedBy; 28 29 /** 30 * CronetUploadDataStream handles communication between an upload body 31 * encapsulated in the embedder's {@link UploadDataSink} and a C++ 32 * UploadDataStreamAdapter, which it owns. It's attached to a {@link 33 * CronetUrlRequest}'s during the construction of request's native C++ objects 34 * on the network thread, though it's created on one of the embedder's threads. 35 * It is called by the UploadDataStreamAdapter on the network thread, but calls 36 * into the UploadDataSink and the UploadDataStreamAdapter on the Executor 37 * passed into its constructor. 38 */ 39 @JNINamespace("cronet") 40 @VisibleForTesting 41 public final class CronetUploadDataStream extends UploadDataSink { 42 private static final String TAG = CronetUploadDataStream.class.getSimpleName(); 43 // These are never changed, once a request starts. 44 private final Executor mExecutor; 45 private final VersionSafeCallbacks.UploadDataProviderWrapper mDataProvider; 46 private final CronetUrlRequest mRequest; 47 private long mLength; 48 private long mRemainingLength; 49 private long mByteBufferLimit; 50 51 // Reusable read task, to reduce redundant memory allocation. 52 private final Runnable mReadTask = 53 new Runnable() { 54 @Override 55 public void run() { 56 synchronized (mLock) { 57 if (mUploadDataStreamAdapter == 0) { 58 return; 59 } 60 checkState(UserCallback.NOT_IN_CALLBACK); 61 if (mByteBuffer == null) { 62 throw new IllegalStateException( 63 "Unexpected readData call. Buffer is null"); 64 } 65 mInWhichUserCallback = UserCallback.READ; 66 } 67 try { 68 checkCallingThread(); 69 assert mByteBuffer.position() == 0; 70 mDataProvider.read(CronetUploadDataStream.this, mByteBuffer); 71 } catch (Exception exception) { 72 onError(exception); 73 } 74 } 75 }; 76 77 // ByteBuffer created in the native code and passed to 78 // UploadDataProvider for reading. It is only valid from the 79 // call to mDataProvider.read until onError or onReadSucceeded. 80 private ByteBuffer mByteBuffer; 81 82 // Lock that protects all subsequent variables. The adapter has to be 83 // protected to ensure safe shutdown, mReading and mRewinding are protected 84 // to robustly detect getting read/rewind results more often than expected. 85 private final Object mLock = new Object(); 86 87 // Native adapter object, owned by the CronetUploadDataStream. It's only 88 // deleted after the native UploadDataStream object is destroyed. All access 89 // to the adapter is synchronized, for safe usage and cleanup. 90 @GuardedBy("mLock") 91 private long mUploadDataStreamAdapter; 92 93 @IntDef({ 94 UserCallback.READ, 95 UserCallback.REWIND, 96 UserCallback.GET_LENGTH, 97 UserCallback.NOT_IN_CALLBACK 98 }) 99 @Retention(RetentionPolicy.SOURCE) 100 private @interface UserCallback { 101 int READ = 0; 102 int REWIND = 1; 103 int GET_LENGTH = 2; 104 int NOT_IN_CALLBACK = 3; 105 } 106 107 @GuardedBy("mLock") 108 private @UserCallback int mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK; 109 110 @GuardedBy("mLock") 111 private boolean mDestroyAdapterPostponed; 112 113 private Runnable mOnDestroyedCallbackForTesting; 114 115 /** 116 * Constructs a CronetUploadDataStream. 117 * @param dataProvider the UploadDataProvider to read data from. 118 * @param executor the Executor to execute UploadDataProvider tasks. 119 */ CronetUploadDataStream( UploadDataProvider dataProvider, Executor executor, CronetUrlRequest request)120 public CronetUploadDataStream( 121 UploadDataProvider dataProvider, Executor executor, CronetUrlRequest request) { 122 mExecutor = executor; 123 mDataProvider = new VersionSafeCallbacks.UploadDataProviderWrapper(dataProvider); 124 mRequest = request; 125 } 126 127 /** 128 * Called by native code to make the UploadDataProvider read data into 129 * {@code byteBuffer}. 130 */ 131 @SuppressWarnings("unused") 132 @CalledByNative readData(ByteBuffer byteBuffer)133 void readData(ByteBuffer byteBuffer) { 134 mByteBuffer = byteBuffer; 135 mByteBufferLimit = byteBuffer.limit(); 136 postTaskToExecutor(mReadTask); 137 } 138 139 // TODO(mmenke): Consider implementing a cancel method. 140 // currently wait for any pending read to complete. 141 142 /** Called by native code to make the UploadDataProvider rewind upload data. */ 143 @SuppressWarnings("unused") 144 @CalledByNative rewind()145 void rewind() { 146 Runnable task = 147 new Runnable() { 148 @Override 149 public void run() { 150 synchronized (mLock) { 151 if (mUploadDataStreamAdapter == 0) { 152 return; 153 } 154 checkState(UserCallback.NOT_IN_CALLBACK); 155 mInWhichUserCallback = UserCallback.REWIND; 156 } 157 try { 158 checkCallingThread(); 159 mDataProvider.rewind(CronetUploadDataStream.this); 160 } catch (Exception exception) { 161 onError(exception); 162 } 163 } 164 }; 165 postTaskToExecutor(task); 166 } 167 checkCallingThread()168 private void checkCallingThread() { 169 mRequest.checkCallingThread(); 170 } 171 172 @GuardedBy("mLock") checkState(@serCallback int mode)173 private void checkState(@UserCallback int mode) { 174 if (mInWhichUserCallback != mode) { 175 throw new IllegalStateException( 176 "Expected " + mode + ", but was " + mInWhichUserCallback); 177 } 178 } 179 180 /** 181 * Called when the native UploadDataStream is destroyed. At this point, 182 * the native adapter needs to be destroyed, but only after any pending 183 * read operation completes, as the adapter owns the read buffer. 184 */ 185 @SuppressWarnings("unused") 186 @CalledByNative onUploadDataStreamDestroyed()187 void onUploadDataStreamDestroyed() { 188 destroyAdapter(); 189 } 190 191 /** 192 * Helper method called when an exception occurred. This method resets 193 * states and propagates the error to the request. 194 */ onError(Throwable exception)195 private void onError(Throwable exception) { 196 final boolean sendClose; 197 synchronized (mLock) { 198 if (mInWhichUserCallback == UserCallback.NOT_IN_CALLBACK) { 199 throw new IllegalStateException( 200 "There is no read or rewind or length check in progress.", exception); 201 } 202 sendClose = mInWhichUserCallback == UserCallback.GET_LENGTH; 203 mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK; 204 mByteBuffer = null; 205 destroyAdapterIfPostponed(); 206 } 207 // Failure before length is obtained means that the request has failed before the 208 // adapter has been initialized. Close the UploadDataProvider. This is safe to call 209 // here since failure during getLength can only happen on the user's executor. 210 if (sendClose) { 211 try { 212 mDataProvider.close(); 213 } catch (Exception e) { 214 Log.e(TAG, "Failure closing data provider", e); 215 } 216 } 217 218 // Just fail the request - simpler to fail directly, and 219 // UploadDataStream only supports failing during initialization, not 220 // while reading. The request is smart enough to handle the case where 221 // it was already canceled by the embedder. 222 mRequest.onUploadException(exception); 223 } 224 225 @Override 226 @SuppressLint("DefaultLocale") onReadSucceeded(boolean lastChunk)227 public void onReadSucceeded(boolean lastChunk) { 228 synchronized (mLock) { 229 checkState(UserCallback.READ); 230 if (mByteBufferLimit != mByteBuffer.limit()) { 231 throw new IllegalStateException("ByteBuffer limit changed"); 232 } 233 if (lastChunk && mLength >= 0) { 234 throw new IllegalArgumentException("Non-chunked upload can't have last chunk"); 235 } 236 int bytesRead = mByteBuffer.position(); 237 mRemainingLength -= bytesRead; 238 if (mRemainingLength < 0 && mLength >= 0) { 239 throw new IllegalArgumentException( 240 String.format( 241 "Read upload data length %d exceeds expected length %d", 242 mLength - mRemainingLength, mLength)); 243 } 244 mByteBuffer.position(0); 245 mByteBuffer = null; 246 mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK; 247 248 destroyAdapterIfPostponed(); 249 // Request may been canceled already. 250 if (mUploadDataStreamAdapter == 0) { 251 return; 252 } 253 CronetUploadDataStreamJni.get() 254 .onReadSucceeded( 255 mUploadDataStreamAdapter, 256 CronetUploadDataStream.this, 257 bytesRead, 258 lastChunk); 259 } 260 } 261 262 @Override onReadError(Exception exception)263 public void onReadError(Exception exception) { 264 synchronized (mLock) { 265 checkState(UserCallback.READ); 266 onError(exception); 267 } 268 } 269 270 @Override onRewindSucceeded()271 public void onRewindSucceeded() { 272 synchronized (mLock) { 273 checkState(UserCallback.REWIND); 274 mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK; 275 mRemainingLength = mLength; 276 // Request may been canceled already. 277 if (mUploadDataStreamAdapter == 0) { 278 return; 279 } 280 CronetUploadDataStreamJni.get() 281 .onRewindSucceeded(mUploadDataStreamAdapter, CronetUploadDataStream.this); 282 } 283 } 284 285 @Override onRewindError(Exception exception)286 public void onRewindError(Exception exception) { 287 synchronized (mLock) { 288 checkState(UserCallback.REWIND); 289 onError(exception); 290 } 291 } 292 293 /** Posts task to application Executor. */ postTaskToExecutor(Runnable task)294 void postTaskToExecutor(Runnable task) { 295 try { 296 mExecutor.execute(task); 297 } catch (Throwable e) { 298 // Just fail the request. The request is smart enough to handle the 299 // case where it was already canceled by the embedder. 300 mRequest.onUploadException(e); 301 } 302 } 303 304 /** 305 * The adapter is owned by the CronetUploadDataStream, so it can be 306 * destroyed safely when there is no pending read; however, destruction is 307 * initiated by the destruction of the native UploadDataStream. 308 */ destroyAdapter()309 private void destroyAdapter() { 310 synchronized (mLock) { 311 if (mInWhichUserCallback == UserCallback.READ) { 312 // Wait for the read to complete before destroy the adapter. 313 mDestroyAdapterPostponed = true; 314 return; 315 } 316 if (mUploadDataStreamAdapter == 0) { 317 return; 318 } 319 CronetUploadDataStreamJni.get().destroy(mUploadDataStreamAdapter); 320 mUploadDataStreamAdapter = 0; 321 if (mOnDestroyedCallbackForTesting != null) { 322 mOnDestroyedCallbackForTesting.run(); 323 } 324 } 325 postTaskToExecutor( 326 new Runnable() { 327 @Override 328 public void run() { 329 try { 330 checkCallingThread(); 331 mDataProvider.close(); 332 } catch (Exception e) { 333 Log.e(TAG, "Exception thrown when closing", e); 334 } 335 } 336 }); 337 } 338 339 /** 340 * Destroys the native adapter if the destruction is postponed due to a 341 * pending read, which has since completed. Caller needs to be on executor 342 * thread. 343 */ destroyAdapterIfPostponed()344 private void destroyAdapterIfPostponed() { 345 synchronized (mLock) { 346 if (mInWhichUserCallback == UserCallback.READ) { 347 throw new IllegalStateException( 348 "Method should not be called when read has not completed."); 349 } 350 if (mDestroyAdapterPostponed) { 351 destroyAdapter(); 352 } 353 } 354 } 355 356 /** 357 * Initializes upload length by getting it from data provider. Submits to 358 * the user's executor thread to allow getLength() to block and/or report errors. 359 * If data provider throws an exception, then it is reported to the request. 360 * No native calls to urlRequest are allowed as this is done before request 361 * start, so native object may not exist. 362 */ initializeWithRequest()363 void initializeWithRequest() { 364 synchronized (mLock) { 365 mInWhichUserCallback = UserCallback.GET_LENGTH; 366 } 367 try { 368 mRequest.checkCallingThread(); 369 mLength = mDataProvider.getLength(); 370 mRemainingLength = mLength; 371 } catch (Throwable t) { 372 onError(t); 373 } 374 synchronized (mLock) { 375 mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK; 376 } 377 } 378 379 /** 380 * Creates native objects and attaches them to the underlying request 381 * adapter object. Always called on executor thread. 382 */ attachNativeAdapterToRequest(final long requestAdapter)383 void attachNativeAdapterToRequest(final long requestAdapter) { 384 synchronized (mLock) { 385 mUploadDataStreamAdapter = 386 CronetUploadDataStreamJni.get() 387 .attachUploadDataToRequest( 388 CronetUploadDataStream.this, requestAdapter, mLength); 389 } 390 } 391 392 /** 393 * Creates a native CronetUploadDataStreamAdapter and 394 * CronetUploadDataStream for testing. 395 * @return the address of the native CronetUploadDataStream object. 396 */ createUploadDataStreamForTesting()397 public long createUploadDataStreamForTesting() throws IOException { 398 synchronized (mLock) { 399 mUploadDataStreamAdapter = 400 CronetUploadDataStreamJni.get() 401 .createAdapterForTesting(CronetUploadDataStream.this); 402 mLength = mDataProvider.getLength(); 403 mRemainingLength = mLength; 404 return CronetUploadDataStreamJni.get() 405 .createUploadDataStreamForTesting( 406 CronetUploadDataStream.this, mLength, mUploadDataStreamAdapter); 407 } 408 } 409 410 @VisibleForTesting setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting)411 public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) { 412 mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting; 413 } 414 415 // Native methods are implemented in upload_data_stream_adapter.cc. 416 @NativeMethods 417 interface Natives { attachUploadDataToRequest( CronetUploadDataStream caller, long urlRequestAdapter, long length)418 long attachUploadDataToRequest( 419 CronetUploadDataStream caller, long urlRequestAdapter, long length); 420 createAdapterForTesting(CronetUploadDataStream caller)421 long createAdapterForTesting(CronetUploadDataStream caller); 422 createUploadDataStreamForTesting( CronetUploadDataStream caller, long length, long adapter)423 long createUploadDataStreamForTesting( 424 CronetUploadDataStream caller, long length, long adapter); 425 426 @NativeClassQualifiedName("CronetUploadDataStreamAdapter") onReadSucceeded( long nativePtr, CronetUploadDataStream caller, int bytesRead, boolean finalChunk)427 void onReadSucceeded( 428 long nativePtr, CronetUploadDataStream caller, int bytesRead, boolean finalChunk); 429 430 @NativeClassQualifiedName("CronetUploadDataStreamAdapter") onRewindSucceeded(long nativePtr, CronetUploadDataStream caller)431 void onRewindSucceeded(long nativePtr, CronetUploadDataStream caller); 432 433 @NativeClassQualifiedName("CronetUploadDataStreamAdapter") destroy(long nativePtr)434 void destroy(long nativePtr); 435 } 436 } 437