1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.impl; 6 7 import androidx.annotation.IntDef; 8 9 import org.chromium.net.UploadDataProvider; 10 import org.chromium.net.UploadDataSink; 11 12 import java.io.IOException; 13 import java.lang.annotation.Retention; 14 import java.lang.annotation.RetentionPolicy; 15 import java.nio.ByteBuffer; 16 import java.util.Locale; 17 import java.util.concurrent.Executor; 18 import java.util.concurrent.RejectedExecutionException; 19 import java.util.concurrent.atomic.AtomicInteger; 20 21 /** 22 * Base class for Java UrlRequest implementations of UploadDataSink. Handles asynchronicity and 23 * manages the executors for this upload. 24 */ 25 public abstract class JavaUploadDataSinkBase extends UploadDataSink { 26 @IntDef({ 27 SinkState.AWAITING_READ_RESULT, 28 SinkState.AWAITING_REWIND_RESULT, 29 SinkState.UPLOADING, 30 SinkState.NOT_STARTED 31 }) 32 @Retention(RetentionPolicy.SOURCE) 33 @interface SinkState { 34 int AWAITING_READ_RESULT = 0; 35 int AWAITING_REWIND_RESULT = 1; 36 int UPLOADING = 2; 37 int NOT_STARTED = 3; 38 } 39 40 public static final int DEFAULT_UPLOAD_BUFFER_SIZE = 8192; 41 42 private final AtomicInteger /*SinkState*/ mSinkState = new AtomicInteger(SinkState.NOT_STARTED); 43 private final Executor mUserUploadExecutor; 44 private final Executor mExecutor; 45 private final VersionSafeCallbacks.UploadDataProviderWrapper mUploadProvider; 46 private ByteBuffer mBuffer; 47 48 /** This holds the total bytes to send (the content-length). -1 if unknown. */ 49 private long mTotalBytes; 50 51 /** This holds the bytes written so far */ 52 private long mWrittenBytes; 53 JavaUploadDataSinkBase( final Executor userExecutor, Executor executor, UploadDataProvider provider)54 public JavaUploadDataSinkBase( 55 final Executor userExecutor, Executor executor, UploadDataProvider provider) { 56 mUserUploadExecutor = 57 new Executor() { 58 @Override 59 public void execute(Runnable runnable) { 60 try { 61 userExecutor.execute(runnable); 62 } catch (RejectedExecutionException e) { 63 processUploadError(e); 64 } 65 } 66 }; 67 mExecutor = executor; 68 mUploadProvider = new VersionSafeCallbacks.UploadDataProviderWrapper(provider); 69 } 70 71 @Override onReadSucceeded(final boolean finalChunk)72 public void onReadSucceeded(final boolean finalChunk) { 73 if (!mSinkState.compareAndSet( 74 /* expected= */ SinkState.AWAITING_READ_RESULT, 75 /* updated= */ SinkState.UPLOADING)) { 76 throw new IllegalStateException( 77 "onReadSucceeded() called when not awaiting a read result; in state: " 78 + mSinkState.get()); 79 } 80 JavaUrlRequestUtils.CheckedRunnable checkedRunnable = 81 () -> { 82 mBuffer.flip(); 83 if (mTotalBytes != -1 && mTotalBytes - mWrittenBytes < mBuffer.remaining()) { 84 String msg = 85 String.format( 86 Locale.getDefault(), 87 "Read upload data length %d exceeds expected length %d", 88 mWrittenBytes + mBuffer.remaining(), 89 mTotalBytes); 90 processUploadError(new IllegalArgumentException(msg)); 91 return; 92 } 93 94 mWrittenBytes += processSuccessfulRead(mBuffer); 95 96 if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !finalChunk)) { 97 mBuffer.clear(); 98 mSinkState.set(SinkState.AWAITING_READ_RESULT); 99 executeOnUploadExecutor( 100 () -> { 101 mUploadProvider.read(JavaUploadDataSinkBase.this, mBuffer); 102 }); 103 } else if (mTotalBytes == -1) { 104 finish(); 105 } else if (mTotalBytes == mWrittenBytes) { 106 finish(); 107 } else { 108 String msg = 109 String.format( 110 Locale.getDefault(), 111 "Read upload data length %d exceeds expected length %d", 112 mWrittenBytes, 113 mTotalBytes); 114 processUploadError(new IllegalArgumentException(msg)); 115 } 116 }; 117 mExecutor.execute(getErrorSettingRunnable(checkedRunnable)); 118 } 119 120 @Override onRewindSucceeded()121 public void onRewindSucceeded() { 122 if (!mSinkState.compareAndSet( 123 /* expected= */ SinkState.AWAITING_REWIND_RESULT, 124 /* updated= */ SinkState.UPLOADING)) { 125 throw new IllegalStateException( 126 "onRewindSucceeded() called when not awaiting a rewind; in state: " 127 + mSinkState.get()); 128 } 129 startRead(); 130 } 131 132 @Override onReadError(Exception exception)133 public void onReadError(Exception exception) { 134 processUploadError(exception); 135 } 136 137 @Override onRewindError(Exception exception)138 public void onRewindError(Exception exception) { 139 processUploadError(exception); 140 } 141 startRead()142 private void startRead() { 143 mExecutor.execute( 144 getErrorSettingRunnable( 145 () -> { 146 initializeRead(); 147 mSinkState.set(SinkState.AWAITING_READ_RESULT); 148 executeOnUploadExecutor( 149 () -> { 150 mUploadProvider.read(JavaUploadDataSinkBase.this, mBuffer); 151 }); 152 })); 153 } 154 155 /** 156 * Helper method to execute a checked runnable on the upload executor and process any errors 157 * that occur as upload errors. 158 * 159 * @param runnable the runnable to attempt to run and check for errors 160 */ executeOnUploadExecutor(JavaUrlRequestUtils.CheckedRunnable runnable)161 private void executeOnUploadExecutor(JavaUrlRequestUtils.CheckedRunnable runnable) { 162 try { 163 mUserUploadExecutor.execute(getUploadErrorSettingRunnable(runnable)); 164 } catch (RejectedExecutionException e) { 165 processUploadError(e); 166 } 167 } 168 169 /** 170 * Starts the upload. This method can be called multiple times. If it is not the first time it 171 * is called the {@link UploadDataProvider} must rewind. 172 * 173 * @param firstTime true if this is the first time this {@link UploadDataSink} has started an 174 * upload 175 */ start(final boolean firstTime)176 public void start(final boolean firstTime) { 177 executeOnUploadExecutor( 178 () -> { 179 mTotalBytes = mUploadProvider.getLength(); 180 if (mTotalBytes == 0) { 181 finish(); 182 } else { 183 // If we know how much data we have to upload, and it's small, we can 184 // save memory by allocating a reasonably sized buffer to read into. 185 if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFFER_SIZE) { 186 // Allocate one byte more than necessary, to detect callers 187 // uploading more bytes than they specified in length. 188 mBuffer = ByteBuffer.allocateDirect((int) mTotalBytes + 1); 189 } else { 190 mBuffer = ByteBuffer.allocateDirect(DEFAULT_UPLOAD_BUFFER_SIZE); 191 } 192 193 initializeStart(mTotalBytes); 194 195 if (firstTime) { 196 startRead(); 197 } else { 198 mSinkState.set(SinkState.AWAITING_REWIND_RESULT); 199 mUploadProvider.rewind(JavaUploadDataSinkBase.this); 200 } 201 } 202 }); 203 } 204 205 /** 206 * Gets a runnable that checks for errors and processes them by setting an error state when 207 * executing a {@link CheckedRunnable}. 208 * 209 * @param runnable The runnable to run. 210 * @return a runnable that checks for errors 211 */ getErrorSettingRunnable( JavaUrlRequestUtils.CheckedRunnable runnable)212 protected abstract Runnable getErrorSettingRunnable( 213 JavaUrlRequestUtils.CheckedRunnable runnable); 214 215 /** 216 * Gets a runnable that checks for errors and processes them by setting an upload error state 217 * when executing a {@link CheckedRunnable}. 218 * 219 * @param runnable The runnable to run. 220 * @return a runnable that checks for errors 221 */ getUploadErrorSettingRunnable( JavaUrlRequestUtils.CheckedRunnable runnable)222 protected abstract Runnable getUploadErrorSettingRunnable( 223 JavaUrlRequestUtils.CheckedRunnable runnable); 224 225 /** 226 * Processes an error encountered while uploading data. 227 * 228 * @param error the {@link Throwable} to process 229 */ processUploadError(final Throwable error)230 protected abstract void processUploadError(final Throwable error); 231 232 /** 233 * Called when a successful read has occurred and there is new data in the {@code mBuffer} to 234 * process. 235 * 236 * @return the number of bytes processed in this read 237 * @throws IOException 238 */ processSuccessfulRead(ByteBuffer buffer)239 protected abstract int processSuccessfulRead(ByteBuffer buffer) throws IOException; 240 241 /** 242 * Finishes this upload. Called when the upload is complete. 243 * 244 * @throws IOException 245 */ finish()246 protected abstract void finish() throws IOException; 247 248 /** 249 * Initializes the {@link UploadDataSink} before each call to {@code read} in the 250 * {@link UploadDataProvider}. 251 * 252 * @throws IOException 253 */ initializeRead()254 protected abstract void initializeRead() throws IOException; 255 256 /** 257 * Initializes the {@link UploadDataSink} at the start of the upload. 258 * 259 * @param totalBytes the total number of bytes to be retrieved in this upload 260 */ initializeStart(long totalBytes)261 protected abstract void initializeStart(long totalBytes); 262 } 263