• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net.impl;
6 
7 import androidx.annotation.IntDef;
8 
9 import org.chromium.net.UploadDataProvider;
10 import org.chromium.net.UploadDataSink;
11 
12 import java.io.IOException;
13 import java.lang.annotation.Retention;
14 import java.lang.annotation.RetentionPolicy;
15 import java.nio.ByteBuffer;
16 import java.util.Locale;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.atomic.AtomicInteger;
20 
21 /**
22  * Base class for Java UrlRequest implementations of UploadDataSink. Handles asynchronicity and
23  * manages the executors for this upload.
24  */
25 public abstract class JavaUploadDataSinkBase extends UploadDataSink {
26     @IntDef({
27         SinkState.AWAITING_READ_RESULT,
28         SinkState.AWAITING_REWIND_RESULT,
29         SinkState.UPLOADING,
30         SinkState.NOT_STARTED
31     })
32     @Retention(RetentionPolicy.SOURCE)
33     @interface SinkState {
34         int AWAITING_READ_RESULT = 0;
35         int AWAITING_REWIND_RESULT = 1;
36         int UPLOADING = 2;
37         int NOT_STARTED = 3;
38     }
39 
40     public static final int DEFAULT_UPLOAD_BUFFER_SIZE = 8192;
41 
42     private final AtomicInteger /*SinkState*/ mSinkState = new AtomicInteger(SinkState.NOT_STARTED);
43     private final Executor mUserUploadExecutor;
44     private final Executor mExecutor;
45     private final VersionSafeCallbacks.UploadDataProviderWrapper mUploadProvider;
46     private ByteBuffer mBuffer;
47 
48     /** This holds the total bytes to send (the content-length). -1 if unknown. */
49     private long mTotalBytes;
50 
51     /** This holds the bytes written so far */
52     private long mWrittenBytes;
53 
JavaUploadDataSinkBase( final Executor userExecutor, Executor executor, UploadDataProvider provider)54     public JavaUploadDataSinkBase(
55             final Executor userExecutor, Executor executor, UploadDataProvider provider) {
56         mUserUploadExecutor =
57                 new Executor() {
58                     @Override
59                     public void execute(Runnable runnable) {
60                         try {
61                             userExecutor.execute(runnable);
62                         } catch (RejectedExecutionException e) {
63                             processUploadError(e);
64                         }
65                     }
66                 };
67         mExecutor = executor;
68         mUploadProvider = new VersionSafeCallbacks.UploadDataProviderWrapper(provider);
69     }
70 
71     @Override
onReadSucceeded(final boolean finalChunk)72     public void onReadSucceeded(final boolean finalChunk) {
73         if (!mSinkState.compareAndSet(
74                 /* expected= */ SinkState.AWAITING_READ_RESULT,
75                 /* updated= */ SinkState.UPLOADING)) {
76             throw new IllegalStateException(
77                     "onReadSucceeded() called when not awaiting a read result; in state: "
78                             + mSinkState.get());
79         }
80         JavaUrlRequestUtils.CheckedRunnable checkedRunnable =
81                 () -> {
82                     mBuffer.flip();
83                     if (mTotalBytes != -1 && mTotalBytes - mWrittenBytes < mBuffer.remaining()) {
84                         String msg =
85                                 String.format(
86                                         Locale.getDefault(),
87                                         "Read upload data length %d exceeds expected length %d",
88                                         mWrittenBytes + mBuffer.remaining(),
89                                         mTotalBytes);
90                         processUploadError(new IllegalArgumentException(msg));
91                         return;
92                     }
93 
94                     mWrittenBytes += processSuccessfulRead(mBuffer);
95 
96                     if (mWrittenBytes < mTotalBytes || (mTotalBytes == -1 && !finalChunk)) {
97                         mBuffer.clear();
98                         mSinkState.set(SinkState.AWAITING_READ_RESULT);
99                         executeOnUploadExecutor(
100                                 () -> {
101                                     mUploadProvider.read(JavaUploadDataSinkBase.this, mBuffer);
102                                 });
103                     } else if (mTotalBytes == -1) {
104                         finish();
105                     } else if (mTotalBytes == mWrittenBytes) {
106                         finish();
107                     } else {
108                         String msg =
109                                 String.format(
110                                         Locale.getDefault(),
111                                         "Read upload data length %d exceeds expected length %d",
112                                         mWrittenBytes,
113                                         mTotalBytes);
114                         processUploadError(new IllegalArgumentException(msg));
115                     }
116                 };
117         mExecutor.execute(getErrorSettingRunnable(checkedRunnable));
118     }
119 
120     @Override
onRewindSucceeded()121     public void onRewindSucceeded() {
122         if (!mSinkState.compareAndSet(
123                 /* expected= */ SinkState.AWAITING_REWIND_RESULT,
124                 /* updated= */ SinkState.UPLOADING)) {
125             throw new IllegalStateException(
126                     "onRewindSucceeded() called when not awaiting a rewind; in state: "
127                             + mSinkState.get());
128         }
129         startRead();
130     }
131 
132     @Override
onReadError(Exception exception)133     public void onReadError(Exception exception) {
134         processUploadError(exception);
135     }
136 
137     @Override
onRewindError(Exception exception)138     public void onRewindError(Exception exception) {
139         processUploadError(exception);
140     }
141 
startRead()142     private void startRead() {
143         mExecutor.execute(
144                 getErrorSettingRunnable(
145                         () -> {
146                             initializeRead();
147                             mSinkState.set(SinkState.AWAITING_READ_RESULT);
148                             executeOnUploadExecutor(
149                                     () -> {
150                                         mUploadProvider.read(JavaUploadDataSinkBase.this, mBuffer);
151                                     });
152                         }));
153     }
154 
155     /**
156      * Helper method to execute a checked runnable on the upload executor and process any errors
157      * that occur as upload errors.
158      *
159      * @param runnable the runnable to attempt to run and check for errors
160      */
executeOnUploadExecutor(JavaUrlRequestUtils.CheckedRunnable runnable)161     private void executeOnUploadExecutor(JavaUrlRequestUtils.CheckedRunnable runnable) {
162         try {
163             mUserUploadExecutor.execute(getUploadErrorSettingRunnable(runnable));
164         } catch (RejectedExecutionException e) {
165             processUploadError(e);
166         }
167     }
168 
169     /**
170      * Starts the upload. This method can be called multiple times. If it is not the first time it
171      * is called the {@link UploadDataProvider} must rewind.
172      *
173      * @param firstTime true if this is the first time this {@link UploadDataSink} has started an
174      *                  upload
175      */
start(final boolean firstTime)176     public void start(final boolean firstTime) {
177         executeOnUploadExecutor(
178                 () -> {
179                     mTotalBytes = mUploadProvider.getLength();
180                     if (mTotalBytes == 0) {
181                         finish();
182                     } else {
183                         // If we know how much data we have to upload, and it's small, we can
184                         // save memory by allocating a reasonably sized buffer to read into.
185                         if (mTotalBytes > 0 && mTotalBytes < DEFAULT_UPLOAD_BUFFER_SIZE) {
186                             // Allocate one byte more than necessary, to detect callers
187                             // uploading more bytes than they specified in length.
188                             mBuffer = ByteBuffer.allocateDirect((int) mTotalBytes + 1);
189                         } else {
190                             mBuffer = ByteBuffer.allocateDirect(DEFAULT_UPLOAD_BUFFER_SIZE);
191                         }
192 
193                         initializeStart(mTotalBytes);
194 
195                         if (firstTime) {
196                             startRead();
197                         } else {
198                             mSinkState.set(SinkState.AWAITING_REWIND_RESULT);
199                             mUploadProvider.rewind(JavaUploadDataSinkBase.this);
200                         }
201                     }
202                 });
203     }
204 
205     /**
206      * Gets a runnable that checks for errors and processes them by setting an error state when
207      * executing a {@link CheckedRunnable}.
208      *
209      * @param runnable The runnable to run.
210      * @return a runnable that checks for errors
211      */
getErrorSettingRunnable( JavaUrlRequestUtils.CheckedRunnable runnable)212     protected abstract Runnable getErrorSettingRunnable(
213             JavaUrlRequestUtils.CheckedRunnable runnable);
214 
215     /**
216      * Gets a runnable that checks for errors and processes them by setting an upload error state
217      * when executing a {@link CheckedRunnable}.
218      *
219      * @param runnable The runnable to run.
220      * @return a runnable that checks for errors
221      */
getUploadErrorSettingRunnable( JavaUrlRequestUtils.CheckedRunnable runnable)222     protected abstract Runnable getUploadErrorSettingRunnable(
223             JavaUrlRequestUtils.CheckedRunnable runnable);
224 
225     /**
226      * Processes an error encountered while uploading data.
227      *
228      * @param error the {@link Throwable} to process
229      */
processUploadError(final Throwable error)230     protected abstract void processUploadError(final Throwable error);
231 
232     /**
233      * Called when a successful read has occurred and there is new data in the {@code mBuffer} to
234      * process.
235      *
236      * @return the number of bytes processed in this read
237      * @throws IOException
238      */
processSuccessfulRead(ByteBuffer buffer)239     protected abstract int processSuccessfulRead(ByteBuffer buffer) throws IOException;
240 
241     /**
242      * Finishes this upload. Called when the upload is complete.
243      *
244      * @throws IOException
245      */
finish()246     protected abstract void finish() throws IOException;
247 
248     /**
249      * Initializes the {@link UploadDataSink} before each call to {@code read} in the
250      * {@link UploadDataProvider}.
251      *
252      * @throws IOException
253      */
initializeRead()254     protected abstract void initializeRead() throws IOException;
255 
256     /**
257      * Initializes the {@link UploadDataSink} at the start of the upload.
258      *
259      * @param totalBytes the total number of bytes to be retrieved in this upload
260      */
initializeStart(long totalBytes)261     protected abstract void initializeStart(long totalBytes);
262 }
263